Skip to main content

lance_index/
vector.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Vector Index
5//!
6
7use std::any::Any;
8use std::fmt::Debug;
9use std::{collections::HashMap, sync::Arc};
10
11use arrow_array::{ArrayRef, Float32Array, RecordBatch, UInt32Array};
12use arrow_schema::Field;
13use async_trait::async_trait;
14use datafusion::execution::SendableRecordBatchStream;
15use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
16use deepsize::DeepSizeOf;
17use futures::stream;
18use ivf::storage::IvfModel;
19use lance_core::{Error, ROW_ID_FIELD, Result};
20use lance_io::traits::Reader;
21use lance_linalg::distance::DistanceType;
22use quantizer::{QuantizationType, Quantizer};
23use std::sync::LazyLock;
24use v3::subindex::SubIndexType;
25
26pub mod bq;
27pub mod distributed;
28pub mod flat;
29pub mod graph;
30pub mod hnsw;
31pub mod ivf;
32pub mod kmeans;
33pub mod pq;
34pub mod quantizer;
35pub mod residual;
36pub mod shared;
37pub mod sq;
38pub mod storage;
39pub mod transform;
40pub mod utils;
41pub mod v3;
42
43use super::pb;
44use crate::metrics::MetricsCollector;
45use crate::{Index, prefilter::PreFilter};
46
47// TODO: Make these crate private once the migration from lance to lance-index is done.
48pub const DIST_COL: &str = "_distance";
49pub const DISTANCE_TYPE_KEY: &str = "distance_type";
50pub const INDEX_UUID_COLUMN: &str = "__index_uuid";
51pub const PART_ID_COLUMN: &str = "__ivf_part_id";
52pub const DIST_Q_C_COLUMN: &str = "__dist_q_c";
53// dist from vector to centroid
54pub const CENTROID_DIST_COLUMN: &str = "__centroid_dist";
55pub const PQ_CODE_COLUMN: &str = "__pq_code";
56pub const SQ_CODE_COLUMN: &str = "__sq_code";
57pub const LOSS_METADATA_KEY: &str = "_loss";
58
59pub type PreparedPartitionSearchHandle = Box<dyn Any + Send>;
60
61/// Controls when a multi-partition search should stop producing more partition results.
62pub trait PartitionSearchControl: Send + Sync {
63    fn should_stop(&self) -> bool;
64
65    fn record_batch(&self, _batch: &RecordBatch) {}
66}
67
68pub static VECTOR_RESULT_SCHEMA: LazyLock<arrow_schema::SchemaRef> = LazyLock::new(|| {
69    arrow_schema::SchemaRef::new(arrow_schema::Schema::new(vec![
70        Field::new(DIST_COL, arrow_schema::DataType::Float32, true),
71        ROW_ID_FIELD.clone(),
72    ]))
73});
74
75pub static PART_ID_FIELD: LazyLock<arrow_schema::Field> = LazyLock::new(|| {
76    arrow_schema::Field::new(PART_ID_COLUMN, arrow_schema::DataType::UInt32, true)
77});
78
79pub static CENTROID_DIST_FIELD: LazyLock<arrow_schema::Field> = LazyLock::new(|| {
80    arrow_schema::Field::new(CENTROID_DIST_COLUMN, arrow_schema::DataType::Float32, true)
81});
82
83pub const DEFAULT_QUERY_PARALLELISM: i32 = 0;
84
85/// Query parameters for the vector indices
86
87#[derive(Debug, Clone)]
88pub struct Query {
89    /// The column to be searched.
90    pub column: String,
91
92    /// The vector to be searched.
93    pub key: ArrayRef,
94
95    /// Top k results to return.
96    pub k: usize,
97
98    /// The lower bound (inclusive) of the distance to be searched.
99    pub lower_bound: Option<f32>,
100
101    /// The upper bound (exclusive) of the distance to be searched.
102    pub upper_bound: Option<f32>,
103
104    /// The minimum number of probes to load and search.  More partitions
105    /// will only be loaded if we have not found k results, or the algorithm
106    /// determines more partitions are needed to satisfy recall requirements.
107    ///
108    /// The planner will always search at least this many partitions. Defaults to 1.
109    pub minimum_nprobes: usize,
110
111    /// The maximum number of probes to load and search.  If not set then
112    /// ALL partitions will be searched, if needed, to satisfy k results.
113    pub maximum_nprobes: Option<usize>,
114
115    /// The number of candidates to reserve while searching.
116    /// this is an optional parameter for HNSW related index types.
117    pub ef: Option<usize>,
118
119    /// If presented, apply a refine step.
120    /// TODO: should we support fraction / float number here?
121    pub refine_factor: Option<u32>,
122
123    /// Distance metric type. If None, uses the index's metric (if available)
124    /// or the default for the data type.
125    pub metric_type: Option<DistanceType>,
126
127    /// Whether to use an ANN index if available
128    pub use_index: bool,
129
130    /// Maximum partition-search concurrency for a single vector query.
131    ///
132    /// The default is 0.
133    /// Value 0 selects the automatic policy; today this resolves to 1 for the
134    /// sequential fast path unless an index implementation overrides it.
135    /// Value -1 uses the CPU pool size.
136    /// Value 1 uses the single-worker sequential partition search path.
137    /// Values >= 2 use the partition-parallel path and are clamped to the CPU
138    /// pool size by the execution layer.
139    pub query_parallelism: i32,
140
141    /// the distance between the query and the centroid
142    /// this is only used for IVF index with Rabit quantization
143    pub dist_q_c: f32,
144}
145
146impl From<pb::VectorMetricType> for DistanceType {
147    fn from(proto: pb::VectorMetricType) -> Self {
148        match proto {
149            pb::VectorMetricType::L2 => Self::L2,
150            pb::VectorMetricType::Cosine => Self::Cosine,
151            pb::VectorMetricType::Dot => Self::Dot,
152            pb::VectorMetricType::Hamming => Self::Hamming,
153        }
154    }
155}
156
157impl From<DistanceType> for pb::VectorMetricType {
158    fn from(mt: DistanceType) -> Self {
159        match mt {
160            DistanceType::L2 => Self::L2,
161            DistanceType::Cosine => Self::Cosine,
162            DistanceType::Dot => Self::Dot,
163            DistanceType::Hamming => Self::Hamming,
164        }
165    }
166}
167
168/// Vector Index for (Approximate) Nearest Neighbor (ANN) Search.
169///
170/// Vector indices are often built as a chain of indices.  For example, IVF -> PQ
171/// or IVF -> HNSW -> SQ.
172///
173/// We use one trait for both the top-level and the sub-indices.  Typically the top-level
174/// search is a partition-aware search and all sub-indices are whole-index searches.
175#[async_trait]
176#[allow(clippy::redundant_pub_crate)]
177pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {
178    /// Search entire index for k nearest neighbors.
179    ///
180    /// It returns a [RecordBatch] with Schema of:
181    ///
182    /// ```
183    /// use arrow_schema::{Schema, Field, DataType};
184    ///
185    /// Schema::new(vec![
186    ///   Field::new("_rowid", DataType::UInt64, true),
187    ///   Field::new("_distance", DataType::Float32, true),
188    /// ]);
189    /// ```
190    ///
191    /// The `pre_filter` argument is used to filter out row ids that we know are
192    /// not relevant to the query. For example, it removes deleted rows or rows that
193    /// do not match a user-provided filter.
194    async fn search(
195        &self,
196        query: &Query,
197        pre_filter: Arc<dyn PreFilter>,
198        metrics: &dyn MetricsCollector,
199    ) -> Result<RecordBatch>;
200
201    /// Find partitions that may contain nearest neighbors.
202    ///
203    /// If maximum_nprobes is set then this method will return the partitions
204    /// that are most likely to contain the nearest neighbors (e.g. the closest
205    /// partitions to the query vector).
206    ///
207    /// Return the partition ids and the distances between the query and the centroids,
208    /// the results should be in sorted order from closest to farthest.
209    fn find_partitions(&self, query: &Query) -> Result<(UInt32Array, Float32Array)>;
210
211    /// Get the total number of partitions in the index.
212    fn total_partitions(&self) -> usize;
213
214    /// Search a single partition for nearest neighbors.
215    ///
216    /// This method should return the same results as [`VectorIndex::search`] method except
217    /// that it will only search a single partition.
218    async fn search_in_partition(
219        &self,
220        partition_id: usize,
221        query: &Query,
222        pre_filter: Arc<dyn PreFilter>,
223        metrics: &dyn MetricsCollector,
224    ) -> Result<RecordBatch>;
225
226    /// Asynchronously prepare a single-partition search so the CPU-heavy portion
227    /// can be executed separately.
228    async fn prepare_partition_search(
229        &self,
230        _partition_id: usize,
231        _query: &Query,
232        _pre_filter: Arc<dyn PreFilter>,
233        _metrics: &dyn MetricsCollector,
234    ) -> Result<PreparedPartitionSearchHandle> {
235        unimplemented!("prepared partition search is not supported for this index")
236    }
237
238    /// Execute the synchronous portion of a previously prepared partition search.
239    fn search_prepared_partition(
240        &self,
241        _prepared: PreparedPartitionSearchHandle,
242        _metrics: &dyn MetricsCollector,
243    ) -> Result<RecordBatch> {
244        unimplemented!("prepared partition search is not supported for this index")
245    }
246
247    /// Return true if the index supports splitting partition search into async
248    /// prepare and sync execute phases.
249    fn supports_prepared_partition_search(&self) -> bool {
250        false
251    }
252
253    /// Choose partition search concurrency for `query_parallelism = 0`.
254    ///
255    /// The default keeps the single-worker sequential path. Index
256    /// implementations can override this when their sub-index search work does
257    /// not benefit from the sequential fast path.
258    fn auto_query_parallelism(&self, _cpu_pool_size: usize) -> usize {
259        1
260    }
261
262    /// Search a range of partitions and return a stream of per-partition result batches.
263    ///
264    /// The default implementation searches each partition sequentially with
265    /// [`VectorIndex::search_in_partition`]. Implementations can override this
266    /// to use a more efficient execution strategy.
267    #[allow(clippy::too_many_arguments)]
268    async fn search_partitions(
269        self: Arc<Self>,
270        query: Query,
271        partitions: Arc<UInt32Array>,
272        q_c_dists: Arc<Float32Array>,
273        start_idx: usize,
274        end_idx: usize,
275        pre_filter: Arc<dyn PreFilter>,
276        control: Option<Arc<dyn PartitionSearchControl>>,
277        metrics: Arc<dyn MetricsCollector>,
278    ) -> Result<SendableRecordBatchStream>
279    where
280        Self: 'static,
281    {
282        if partitions.len() != q_c_dists.len() {
283            return Err(Error::invalid_input(format!(
284                "partition count {} does not match centroid distance count {}",
285                partitions.len(),
286                q_c_dists.len()
287            )));
288        }
289        if start_idx > end_idx || end_idx > partitions.len() {
290            return Err(Error::invalid_input(format!(
291                "invalid partition search range [{start_idx}, {end_idx}) for {} partitions",
292                partitions.len()
293            )));
294        }
295
296        let stream = stream::try_unfold(start_idx, move |idx| {
297            let index = self.clone();
298            let partitions = partitions.clone();
299            let q_c_dists = q_c_dists.clone();
300            let query = query.clone();
301            let pre_filter = pre_filter.clone();
302            let control = control.clone();
303            let metrics = metrics.clone();
304            async move {
305                if idx >= end_idx
306                    || control
307                        .as_ref()
308                        .is_some_and(|control| control.should_stop())
309                {
310                    return Ok(None);
311                }
312                let part_id = partitions.value(idx);
313                let mut query = query;
314                query.dist_q_c = q_c_dists.value(idx);
315                index
316                    .search_in_partition(part_id as usize, &query, pre_filter, metrics.as_ref())
317                    .await
318                    .map(|batch| {
319                        if let Some(control) = control.as_ref() {
320                            control.record_batch(&batch);
321                        }
322                        Some((batch, idx + 1))
323                    })
324                    .map_err(Into::into)
325            }
326        });
327        Ok(Box::pin(RecordBatchStreamAdapter::new(
328            VECTOR_RESULT_SCHEMA.clone(),
329            stream,
330        )))
331    }
332
333    /// If the index is loadable by IVF, so it can be a sub-index that
334    /// is loaded on demand by IVF.
335    fn is_loadable(&self) -> bool;
336
337    /// Use residual vector to search.
338    fn use_residual(&self) -> bool;
339
340    // async fn append(&self, batches: Vec<RecordBatch>) -> Result<()>;
341    // async fn merge(&self, indices: Vec<Arc<dyn VectorIndex>>) -> Result<()>;
342
343    /// Load the index from the reader on-demand.
344    async fn load(
345        &self,
346        reader: Arc<dyn Reader>,
347        offset: usize,
348        length: usize,
349    ) -> Result<Box<dyn VectorIndex>>;
350
351    /// Load the partition from the reader on-demand.
352    async fn load_partition(
353        &self,
354        reader: Arc<dyn Reader>,
355        offset: usize,
356        length: usize,
357        _partition_id: usize,
358    ) -> Result<Box<dyn VectorIndex>> {
359        self.load(reader, offset, length).await
360    }
361
362    // for IVF only
363    async fn partition_reader(
364        &self,
365        _partition_id: usize,
366        _with_vector: bool,
367        _metrics: &dyn MetricsCollector,
368    ) -> Result<SendableRecordBatchStream> {
369        unimplemented!("only for IVF")
370    }
371
372    // for SubIndex only
373    async fn to_batch_stream(&self, with_vector: bool) -> Result<SendableRecordBatchStream>;
374
375    fn num_rows(&self) -> u64;
376
377    /// Return the IDs of rows in the index.
378    fn row_ids(&self) -> Box<dyn Iterator<Item = &'_ u64> + '_>;
379
380    /// Remap the index according to mapping
381    ///
382    /// Each item in mapping describes an old row id -> new row id
383    /// pair.  If old row id -> None then that row id has been
384    /// deleted and can be removed from the index.
385    ///
386    /// If an old row id is not in the mapping then it should be
387    /// left alone.
388    async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()>;
389
390    /// The metric type of this vector index.
391    fn metric_type(&self) -> DistanceType;
392
393    fn ivf_model(&self) -> &IvfModel;
394    fn quantizer(&self) -> Quantizer;
395    fn partition_size(&self, part_id: usize) -> usize;
396
397    /// the index type of this vector index.
398    fn sub_index_type(&self) -> (SubIndexType, QuantizationType);
399}
400
401// it can be an IVF index or a partition of IVF index
402pub trait VectorIndexCacheEntry: Debug + Send + Sync + DeepSizeOf {
403    fn as_any(&self) -> &dyn Any;
404}