1use std::any::Any;
8use std::fmt::Debug;
9use std::{collections::HashMap, sync::Arc};
10
11use arrow_array::{ArrayRef, Float32Array, RecordBatch, UInt32Array};
12use arrow_schema::Field;
13use async_trait::async_trait;
14use datafusion::execution::SendableRecordBatchStream;
15use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
16use deepsize::DeepSizeOf;
17use futures::stream;
18use ivf::storage::IvfModel;
19use lance_core::{Error, ROW_ID_FIELD, Result};
20use lance_io::traits::Reader;
21use lance_linalg::distance::DistanceType;
22use quantizer::{QuantizationType, Quantizer};
23use std::sync::LazyLock;
24use v3::subindex::SubIndexType;
25
26pub mod bq;
27pub mod distributed;
28pub mod flat;
29pub mod graph;
30pub mod hnsw;
31pub mod ivf;
32pub mod kmeans;
33pub mod pq;
34pub mod quantizer;
35pub mod residual;
36pub mod shared;
37pub mod sq;
38pub mod storage;
39pub mod transform;
40pub mod utils;
41pub mod v3;
42
43use super::pb;
44use crate::metrics::MetricsCollector;
45use crate::{Index, prefilter::PreFilter};
46
47pub const DIST_COL: &str = "_distance";
49pub const DISTANCE_TYPE_KEY: &str = "distance_type";
50pub const INDEX_UUID_COLUMN: &str = "__index_uuid";
51pub const PART_ID_COLUMN: &str = "__ivf_part_id";
52pub const DIST_Q_C_COLUMN: &str = "__dist_q_c";
53pub const CENTROID_DIST_COLUMN: &str = "__centroid_dist";
55pub const PQ_CODE_COLUMN: &str = "__pq_code";
56pub const SQ_CODE_COLUMN: &str = "__sq_code";
57pub const LOSS_METADATA_KEY: &str = "_loss";
58
59pub type PreparedPartitionSearchHandle = Box<dyn Any + Send>;
60
61pub trait PartitionSearchControl: Send + Sync {
63 fn should_stop(&self) -> bool;
64
65 fn record_batch(&self, _batch: &RecordBatch) {}
66}
67
68pub static VECTOR_RESULT_SCHEMA: LazyLock<arrow_schema::SchemaRef> = LazyLock::new(|| {
69 arrow_schema::SchemaRef::new(arrow_schema::Schema::new(vec![
70 Field::new(DIST_COL, arrow_schema::DataType::Float32, true),
71 ROW_ID_FIELD.clone(),
72 ]))
73});
74
75pub static PART_ID_FIELD: LazyLock<arrow_schema::Field> = LazyLock::new(|| {
76 arrow_schema::Field::new(PART_ID_COLUMN, arrow_schema::DataType::UInt32, true)
77});
78
79pub static CENTROID_DIST_FIELD: LazyLock<arrow_schema::Field> = LazyLock::new(|| {
80 arrow_schema::Field::new(CENTROID_DIST_COLUMN, arrow_schema::DataType::Float32, true)
81});
82
83pub const DEFAULT_QUERY_PARALLELISM: i32 = 0;
84
85#[derive(Debug, Clone)]
88pub struct Query {
89 pub column: String,
91
92 pub key: ArrayRef,
94
95 pub k: usize,
97
98 pub lower_bound: Option<f32>,
100
101 pub upper_bound: Option<f32>,
103
104 pub minimum_nprobes: usize,
110
111 pub maximum_nprobes: Option<usize>,
114
115 pub ef: Option<usize>,
118
119 pub refine_factor: Option<u32>,
122
123 pub metric_type: Option<DistanceType>,
126
127 pub use_index: bool,
129
130 pub query_parallelism: i32,
140
141 pub dist_q_c: f32,
144}
145
146impl From<pb::VectorMetricType> for DistanceType {
147 fn from(proto: pb::VectorMetricType) -> Self {
148 match proto {
149 pb::VectorMetricType::L2 => Self::L2,
150 pb::VectorMetricType::Cosine => Self::Cosine,
151 pb::VectorMetricType::Dot => Self::Dot,
152 pb::VectorMetricType::Hamming => Self::Hamming,
153 }
154 }
155}
156
157impl From<DistanceType> for pb::VectorMetricType {
158 fn from(mt: DistanceType) -> Self {
159 match mt {
160 DistanceType::L2 => Self::L2,
161 DistanceType::Cosine => Self::Cosine,
162 DistanceType::Dot => Self::Dot,
163 DistanceType::Hamming => Self::Hamming,
164 }
165 }
166}
167
168#[async_trait]
176#[allow(clippy::redundant_pub_crate)]
177pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {
178 async fn search(
195 &self,
196 query: &Query,
197 pre_filter: Arc<dyn PreFilter>,
198 metrics: &dyn MetricsCollector,
199 ) -> Result<RecordBatch>;
200
201 fn find_partitions(&self, query: &Query) -> Result<(UInt32Array, Float32Array)>;
210
211 fn total_partitions(&self) -> usize;
213
214 async fn search_in_partition(
219 &self,
220 partition_id: usize,
221 query: &Query,
222 pre_filter: Arc<dyn PreFilter>,
223 metrics: &dyn MetricsCollector,
224 ) -> Result<RecordBatch>;
225
226 async fn prepare_partition_search(
229 &self,
230 _partition_id: usize,
231 _query: &Query,
232 _pre_filter: Arc<dyn PreFilter>,
233 _metrics: &dyn MetricsCollector,
234 ) -> Result<PreparedPartitionSearchHandle> {
235 unimplemented!("prepared partition search is not supported for this index")
236 }
237
238 fn search_prepared_partition(
240 &self,
241 _prepared: PreparedPartitionSearchHandle,
242 _metrics: &dyn MetricsCollector,
243 ) -> Result<RecordBatch> {
244 unimplemented!("prepared partition search is not supported for this index")
245 }
246
247 fn supports_prepared_partition_search(&self) -> bool {
250 false
251 }
252
253 fn auto_query_parallelism(&self, _cpu_pool_size: usize) -> usize {
259 1
260 }
261
262 #[allow(clippy::too_many_arguments)]
268 async fn search_partitions(
269 self: Arc<Self>,
270 query: Query,
271 partitions: Arc<UInt32Array>,
272 q_c_dists: Arc<Float32Array>,
273 start_idx: usize,
274 end_idx: usize,
275 pre_filter: Arc<dyn PreFilter>,
276 control: Option<Arc<dyn PartitionSearchControl>>,
277 metrics: Arc<dyn MetricsCollector>,
278 ) -> Result<SendableRecordBatchStream>
279 where
280 Self: 'static,
281 {
282 if partitions.len() != q_c_dists.len() {
283 return Err(Error::invalid_input(format!(
284 "partition count {} does not match centroid distance count {}",
285 partitions.len(),
286 q_c_dists.len()
287 )));
288 }
289 if start_idx > end_idx || end_idx > partitions.len() {
290 return Err(Error::invalid_input(format!(
291 "invalid partition search range [{start_idx}, {end_idx}) for {} partitions",
292 partitions.len()
293 )));
294 }
295
296 let stream = stream::try_unfold(start_idx, move |idx| {
297 let index = self.clone();
298 let partitions = partitions.clone();
299 let q_c_dists = q_c_dists.clone();
300 let query = query.clone();
301 let pre_filter = pre_filter.clone();
302 let control = control.clone();
303 let metrics = metrics.clone();
304 async move {
305 if idx >= end_idx
306 || control
307 .as_ref()
308 .is_some_and(|control| control.should_stop())
309 {
310 return Ok(None);
311 }
312 let part_id = partitions.value(idx);
313 let mut query = query;
314 query.dist_q_c = q_c_dists.value(idx);
315 index
316 .search_in_partition(part_id as usize, &query, pre_filter, metrics.as_ref())
317 .await
318 .map(|batch| {
319 if let Some(control) = control.as_ref() {
320 control.record_batch(&batch);
321 }
322 Some((batch, idx + 1))
323 })
324 .map_err(Into::into)
325 }
326 });
327 Ok(Box::pin(RecordBatchStreamAdapter::new(
328 VECTOR_RESULT_SCHEMA.clone(),
329 stream,
330 )))
331 }
332
333 fn is_loadable(&self) -> bool;
336
337 fn use_residual(&self) -> bool;
339
340 async fn load(
345 &self,
346 reader: Arc<dyn Reader>,
347 offset: usize,
348 length: usize,
349 ) -> Result<Box<dyn VectorIndex>>;
350
351 async fn load_partition(
353 &self,
354 reader: Arc<dyn Reader>,
355 offset: usize,
356 length: usize,
357 _partition_id: usize,
358 ) -> Result<Box<dyn VectorIndex>> {
359 self.load(reader, offset, length).await
360 }
361
362 async fn partition_reader(
364 &self,
365 _partition_id: usize,
366 _with_vector: bool,
367 _metrics: &dyn MetricsCollector,
368 ) -> Result<SendableRecordBatchStream> {
369 unimplemented!("only for IVF")
370 }
371
372 async fn to_batch_stream(&self, with_vector: bool) -> Result<SendableRecordBatchStream>;
374
375 fn num_rows(&self) -> u64;
376
377 fn row_ids(&self) -> Box<dyn Iterator<Item = &'_ u64> + '_>;
379
380 async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()>;
389
390 fn metric_type(&self) -> DistanceType;
392
393 fn ivf_model(&self) -> &IvfModel;
394 fn quantizer(&self) -> Quantizer;
395 fn partition_size(&self, part_id: usize) -> usize;
396
397 fn sub_index_type(&self) -> (SubIndexType, QuantizationType);
399}
400
401pub trait VectorIndexCacheEntry: Debug + Send + Sync + DeepSizeOf {
403 fn as_any(&self) -> &dyn Any;
404}