lance_index/vector/
storage.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Vector Storage, holding (quantized) vectors and providing distance calculation.
5
6use crate::vector::quantizer::QuantizerStorage;
7use arrow::compute::concat_batches;
8use arrow_array::{ArrayRef, RecordBatch};
9use arrow_schema::SchemaRef;
10use deepsize::DeepSizeOf;
11use futures::prelude::stream::TryStreamExt;
12use lance_arrow::RecordBatchExt;
13use lance_core::{Error, Result, ROW_ID};
14use lance_encoding::decoder::FilterExpression;
15use lance_file::reader::FileReader;
16use lance_io::ReadBatchParams;
17use lance_linalg::distance::DistanceType;
18use prost::Message;
19use snafu::location;
20use std::{any::Any, sync::Arc};
21
22use crate::frag_reuse::FragReuseIndex;
23use crate::{
24    pb,
25    vector::{
26        ivf::storage::{IvfModel, IVF_METADATA_KEY},
27        quantizer::Quantization,
28    },
29};
30
31use super::quantizer::{Quantizer, QuantizerMetadata};
32use super::DISTANCE_TYPE_KEY;
33
34/// <section class="warning">
35///  Internal API
36///
37///  API stability is not guaranteed
38/// </section>
39pub trait DistCalculator {
40    fn distance(&self, id: u32) -> f32;
41
42    // return the distances of all rows
43    // k_hint is a hint that can be used for optimization
44    fn distance_all(&self, k_hint: usize) -> Vec<f32>;
45
46    fn prefetch(&self, _id: u32) {}
47}
48
49pub const STORAGE_METADATA_KEY: &str = "storage_metadata";
50
51/// Vector Storage is the abstraction to store the vectors.
52///
53/// It can be in-memory or on-disk, raw vector or quantized vectors.
54///
55/// It abstracts away the logic to compute the distance between vectors.
56///
57/// TODO: should we rename this to "VectorDistance"?;
58///
59/// <section class="warning">
60///  Internal API
61///
62///  API stability is not guaranteed
63/// </section>
64pub trait VectorStore: Send + Sync + Sized + Clone {
65    type DistanceCalculator<'a>: DistCalculator
66    where
67        Self: 'a;
68
69    fn as_any(&self) -> &dyn Any;
70
71    fn schema(&self) -> &SchemaRef;
72
73    fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch> + Send>;
74
75    fn len(&self) -> usize;
76
77    /// Returns true if this graph is empty.
78    fn is_empty(&self) -> bool {
79        self.len() == 0
80    }
81
82    /// Return [DistanceType].
83    fn distance_type(&self) -> DistanceType;
84
85    /// Get the lance ROW ID from one vector.
86    fn row_id(&self, id: u32) -> u64;
87
88    fn row_ids(&self) -> impl Iterator<Item = &u64>;
89
90    /// Append Raw [RecordBatch] into the Storage.
91    /// The storage implement will perform quantization if necessary.
92    fn append_batch(&self, batch: RecordBatch, vector_column: &str) -> Result<Self>;
93
94    /// Create a [DistCalculator] to compute the distance between the query.
95    ///
96    /// Using dist calculator can be more efficient as it can pre-compute some
97    /// values.
98    fn dist_calculator(&self, query: ArrayRef, dist_q_c: f32) -> Self::DistanceCalculator<'_>;
99
100    fn dist_calculator_from_id(&self, id: u32) -> Self::DistanceCalculator<'_>;
101
102    fn dist_between(&self, u: u32, v: u32) -> f32 {
103        let dist_cal_u = self.dist_calculator_from_id(u);
104        dist_cal_u.distance(v)
105    }
106}
107
108pub struct StorageBuilder<Q: Quantization> {
109    vector_column: String,
110    distance_type: DistanceType,
111    quantizer: Q,
112
113    frag_reuse_index: Option<Arc<FragReuseIndex>>,
114}
115
116impl<Q: Quantization> StorageBuilder<Q> {
117    pub fn new(
118        vector_column: String,
119        distance_type: DistanceType,
120        quantizer: Q,
121        frag_reuse_index: Option<Arc<FragReuseIndex>>,
122    ) -> Result<Self> {
123        Ok(Self {
124            vector_column,
125            distance_type,
126            quantizer,
127            frag_reuse_index,
128        })
129    }
130
131    pub fn build(&self, batches: Vec<RecordBatch>) -> Result<Q::Storage> {
132        let mut batch = concat_batches(batches[0].schema_ref(), batches.iter())?;
133
134        if batch.column_by_name(self.quantizer.column()).is_none() {
135            let vectors = batch
136                .column_by_name(&self.vector_column)
137                .ok_or(Error::Index {
138                    message: format!("Vector column {} not found in batch", self.vector_column),
139                    location: location!(),
140                })?;
141            let codes = self.quantizer.quantize(vectors)?;
142            batch = batch.drop_column(&self.vector_column)?.try_with_column(
143                arrow_schema::Field::new(self.quantizer.column(), codes.data_type().clone(), true),
144                codes,
145            )?;
146        }
147
148        debug_assert!(batch.column_by_name(ROW_ID).is_some());
149        debug_assert!(batch.column_by_name(self.quantizer.column()).is_some());
150
151        Q::Storage::try_from_batch(
152            batch,
153            &self.quantizer.metadata(None),
154            self.distance_type,
155            self.frag_reuse_index.clone(),
156        )
157    }
158}
159
160/// Loader to load partitioned PQ storage from disk.
161#[derive(Debug)]
162pub struct IvfQuantizationStorage<Q: Quantization> {
163    reader: FileReader,
164
165    distance_type: DistanceType,
166    metadata: Q::Metadata,
167
168    ivf: IvfModel,
169    frag_reuse_index: Option<Arc<FragReuseIndex>>,
170}
171
172impl<Q: Quantization> DeepSizeOf for IvfQuantizationStorage<Q> {
173    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
174        self.metadata.deep_size_of_children(context) + self.ivf.deep_size_of_children(context)
175    }
176}
177
178impl<Q: Quantization> IvfQuantizationStorage<Q> {
179    /// Open a Loader.
180    ///
181    ///
182    pub async fn try_new(
183        reader: FileReader,
184        frag_reuse_index: Option<Arc<FragReuseIndex>>,
185    ) -> Result<Self> {
186        let schema = reader.schema();
187
188        let distance_type = DistanceType::try_from(
189            schema
190                .metadata
191                .get(DISTANCE_TYPE_KEY)
192                .ok_or(Error::Index {
193                    message: format!("{} not found", DISTANCE_TYPE_KEY),
194                    location: location!(),
195                })?
196                .as_str(),
197        )?;
198
199        let ivf_pos = schema
200            .metadata
201            .get(IVF_METADATA_KEY)
202            .ok_or(Error::Index {
203                message: format!("{} not found", IVF_METADATA_KEY),
204                location: location!(),
205            })?
206            .parse()
207            .map_err(|e| Error::Index {
208                message: format!("Failed to decode IVF metadata: {}", e),
209                location: location!(),
210            })?;
211        let ivf_bytes = reader.read_global_buffer(ivf_pos).await?;
212        let ivf = IvfModel::try_from(pb::Ivf::decode(ivf_bytes)?)?;
213
214        let mut metadata: Vec<String> = serde_json::from_str(
215            schema
216                .metadata
217                .get(STORAGE_METADATA_KEY)
218                .ok_or(Error::Index {
219                    message: format!("{} not found", STORAGE_METADATA_KEY),
220                    location: location!(),
221                })?
222                .as_str(),
223        )?;
224        debug_assert_eq!(metadata.len(), 1);
225        // for now the metadata is the same for all partitions, so we just store one
226        let metadata = metadata.pop().ok_or(Error::Index {
227            message: "metadata is empty".to_string(),
228            location: location!(),
229        })?;
230        let mut metadata: Q::Metadata = serde_json::from_str(&metadata)?;
231        // we store large metadata (e.g. PQ codebook) in global buffer,
232        // and the schema metadata just contains a pointer to the buffer
233        if let Some(pos) = metadata.buffer_index() {
234            let bytes = reader.read_global_buffer(pos).await?;
235            metadata.parse_buffer(bytes)?;
236        }
237
238        Ok(Self {
239            reader,
240            distance_type,
241            metadata,
242            ivf,
243            frag_reuse_index,
244        })
245    }
246
247    pub fn num_rows(&self) -> u64 {
248        self.reader.num_rows()
249    }
250
251    pub fn partition_size(&self, part_id: usize) -> usize {
252        self.ivf.partition_size(part_id)
253    }
254
255    pub fn quantizer(&self) -> Result<Quantizer> {
256        let metadata = self.metadata();
257        Q::from_metadata(metadata, self.distance_type)
258    }
259
260    pub fn metadata(&self) -> &Q::Metadata {
261        &self.metadata
262    }
263
264    pub fn distance_type(&self) -> DistanceType {
265        self.distance_type
266    }
267
268    pub fn schema(&self) -> SchemaRef {
269        Arc::new(self.reader.schema().as_ref().into())
270    }
271
272    /// Get the number of partitions in the storage.
273    pub fn num_partitions(&self) -> usize {
274        self.ivf.num_partitions()
275    }
276
277    pub async fn load_partition(&self, part_id: usize) -> Result<Q::Storage> {
278        let range = self.ivf.row_range(part_id);
279        let batch = if range.is_empty() {
280            let schema = self.reader.schema();
281            let arrow_schema = arrow_schema::Schema::from(schema.as_ref());
282            RecordBatch::new_empty(Arc::new(arrow_schema))
283        } else {
284            let batches = self
285                .reader
286                .read_stream(
287                    ReadBatchParams::Range(range),
288                    u32::MAX,
289                    1,
290                    FilterExpression::no_filter(),
291                )?
292                .try_collect::<Vec<_>>()
293                .await?;
294            let schema = Arc::new(self.reader.schema().as_ref().into());
295            concat_batches(&schema, batches.iter())?
296        };
297        Q::Storage::try_from_batch(
298            batch,
299            self.metadata(),
300            self.distance_type,
301            self.frag_reuse_index.clone(),
302        )
303    }
304}