1use std::collections::HashMap;
8use std::sync::Arc;
9
10use arrow::array::AsArray;
11use arrow_array::{Array, ArrayRef, Float32Array, RecordBatch, UInt64Array};
12use arrow_schema::{DataType, Field, Schema, SchemaRef};
13use deepsize::DeepSizeOf;
14use itertools::Itertools;
15use lance_core::{Error, Result, ROW_ID_FIELD};
16use lance_file::reader::FileReader;
17use lance_linalg::distance::DistanceType;
18use serde::{Deserialize, Serialize};
19use snafu::location;
20
21use crate::{
22 metrics::MetricsCollector,
23 prefilter::PreFilter,
24 vector::{
25 graph::{OrderedFloat, OrderedNode},
26 quantizer::{Quantization, QuantizationType, Quantizer, QuantizerMetadata},
27 storage::{DistCalculator, VectorStore},
28 v3::subindex::IvfSubIndex,
29 Query, DIST_COL,
30 },
31};
32
33use super::storage::{FlatBinStorage, FlatFloatStorage, FLAT_COLUMN};
34
35#[derive(Debug, Clone, Default, DeepSizeOf)]
38pub struct FlatIndex {}
39
40use std::sync::LazyLock;
41
42static ANN_SEARCH_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
43 Schema::new(vec![
44 Field::new(DIST_COL, DataType::Float32, true),
45 ROW_ID_FIELD.clone(),
46 ])
47 .into()
48});
49
50#[derive(Default)]
51pub struct FlatQueryParams {
52 lower_bound: Option<f32>,
53 upper_bound: Option<f32>,
54}
55
56impl From<&Query> for FlatQueryParams {
57 fn from(q: &Query) -> Self {
58 Self {
59 lower_bound: q.lower_bound,
60 upper_bound: q.upper_bound,
61 }
62 }
63}
64
65impl IvfSubIndex for FlatIndex {
66 type QueryParams = FlatQueryParams;
67 type BuildParams = ();
68
69 fn name() -> &'static str {
70 "FLAT"
71 }
72
73 fn metadata_key() -> &'static str {
74 "lance:flat"
75 }
76
77 fn schema() -> arrow_schema::SchemaRef {
78 Schema::new(vec![Field::new("__flat_marker", DataType::UInt64, false)]).into()
79 }
80
81 fn search(
82 &self,
83 query: ArrayRef,
84 k: usize,
85 params: Self::QueryParams,
86 storage: &impl VectorStore,
87 prefilter: Arc<dyn PreFilter>,
88 metrics: &dyn MetricsCollector,
89 ) -> Result<RecordBatch> {
90 let is_range_query = params.lower_bound.is_some() || params.upper_bound.is_some();
91 let dist_calc = storage.dist_calculator(query);
92 metrics.record_comparisons(storage.len());
93
94 let res = match prefilter.is_empty() {
95 true => {
96 let iter = dist_calc
97 .distance_all(k)
98 .into_iter()
99 .zip(0..storage.len() as u32)
100 .map(|(dist, id)| OrderedNode::new(id, dist.into()));
101 if is_range_query {
102 let lower_bound = params.lower_bound.unwrap_or(f32::MIN);
103 let upper_bound = params.upper_bound.unwrap_or(f32::MAX);
104 iter.filter(|r| lower_bound <= r.dist.0 && r.dist.0 < upper_bound)
105 .sorted_unstable()
106 } else {
107 iter.sorted_unstable()
108 }
109 }
110 false => {
111 let row_id_mask = prefilter.mask();
112 let iter = (0..storage.len())
113 .filter(|&id| row_id_mask.selected(storage.row_id(id as u32)))
114 .map(|id| OrderedNode {
115 id: id as u32,
116 dist: OrderedFloat(dist_calc.distance(id as u32)),
117 });
118 if is_range_query {
119 let lower_bound = params.lower_bound.unwrap_or(f32::MIN);
120 let upper_bound = params.upper_bound.unwrap_or(f32::MAX);
121 iter.filter(|r| lower_bound <= r.dist.0 && r.dist.0 < upper_bound)
122 .sorted_unstable()
123 } else {
124 iter.sorted_unstable()
125 }
126 }
127 };
128
129 let (row_ids, dists): (Vec<_>, Vec<_>) = res
130 .take(k)
131 .map(|r| (storage.row_id(r.id), r.dist.0))
132 .unzip();
133 let (row_ids, dists) = (UInt64Array::from(row_ids), Float32Array::from(dists));
134
135 Ok(RecordBatch::try_new(
136 ANN_SEARCH_SCHEMA.clone(),
137 vec![Arc::new(dists), Arc::new(row_ids)],
138 )?)
139 }
140
141 fn load(_: RecordBatch) -> Result<Self> {
142 Ok(Self {})
143 }
144
145 fn index_vectors(_: &impl VectorStore, _: Self::BuildParams) -> Result<Self>
146 where
147 Self: Sized,
148 {
149 Ok(Self {})
150 }
151
152 fn remap(&self, _: &HashMap<u64, Option<u64>>) -> Result<Self> {
153 Ok(self.clone())
154 }
155
156 fn to_batch(&self) -> Result<RecordBatch> {
157 Ok(RecordBatch::new_empty(Schema::empty().into()))
158 }
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize, DeepSizeOf)]
162pub struct FlatMetadata {
163 pub dim: usize,
164}
165
166#[async_trait::async_trait]
167impl QuantizerMetadata for FlatMetadata {
168 async fn load(_: &FileReader) -> Result<Self> {
169 unimplemented!("Flat will be used in new index builder which doesn't require this")
170 }
171}
172
173#[derive(Debug, Clone, DeepSizeOf)]
174pub struct FlatQuantizer {
175 dim: usize,
176 distance_type: DistanceType,
177}
178
179impl FlatQuantizer {
180 pub fn new(dim: usize, distance_type: DistanceType) -> Self {
181 Self { dim, distance_type }
182 }
183}
184
185impl Quantization for FlatQuantizer {
186 type BuildParams = ();
187 type Metadata = FlatMetadata;
188 type Storage = FlatFloatStorage;
189
190 fn build(data: &dyn Array, distance_type: DistanceType, _: &Self::BuildParams) -> Result<Self> {
191 let dim = data.as_fixed_size_list().value_length();
192 Ok(Self::new(dim as usize, distance_type))
193 }
194
195 fn retrain(&mut self, _: &dyn Array) -> Result<()> {
196 Ok(())
197 }
198
199 fn code_dim(&self) -> usize {
200 self.dim
201 }
202
203 fn column(&self) -> &'static str {
204 FLAT_COLUMN
205 }
206
207 fn from_metadata(metadata: &Self::Metadata, distance_type: DistanceType) -> Result<Quantizer> {
208 Ok(Quantizer::Flat(Self {
209 dim: metadata.dim,
210 distance_type,
211 }))
212 }
213
214 fn metadata(&self, _: Option<crate::vector::quantizer::QuantizationMetadata>) -> FlatMetadata {
215 FlatMetadata { dim: self.dim }
216 }
217
218 fn metadata_key() -> &'static str {
219 "flat"
220 }
221
222 fn quantization_type() -> QuantizationType {
223 QuantizationType::Flat
224 }
225
226 fn quantize(&self, vectors: &dyn Array) -> Result<ArrayRef> {
227 Ok(vectors.slice(0, vectors.len()))
228 }
229
230 fn field(&self) -> Field {
231 Field::new(
232 FLAT_COLUMN,
233 DataType::FixedSizeList(
234 Arc::new(Field::new("item", DataType::Float32, true)),
235 self.dim as i32,
236 ),
237 true,
238 )
239 }
240}
241
242impl From<FlatQuantizer> for Quantizer {
243 fn from(value: FlatQuantizer) -> Self {
244 Self::Flat(value)
245 }
246}
247
248impl TryFrom<Quantizer> for FlatQuantizer {
249 type Error = Error;
250
251 fn try_from(value: Quantizer) -> Result<Self> {
252 match value {
253 Quantizer::Flat(quantizer) => Ok(quantizer),
254 _ => Err(Error::invalid_input(
255 "quantizer is not FlatQuantizer",
256 location!(),
257 )),
258 }
259 }
260}
261
262#[derive(Debug, Clone, DeepSizeOf)]
263pub struct FlatBinQuantizer {
264 dim: usize,
265 distance_type: DistanceType,
266}
267
268impl FlatBinQuantizer {
269 pub fn new(dim: usize, distance_type: DistanceType) -> Self {
270 Self { dim, distance_type }
271 }
272}
273
274impl Quantization for FlatBinQuantizer {
275 type BuildParams = ();
276 type Metadata = FlatMetadata;
277 type Storage = FlatBinStorage;
278
279 fn build(data: &dyn Array, distance_type: DistanceType, _: &Self::BuildParams) -> Result<Self> {
280 let dim = data.as_fixed_size_list().value_length();
281 Ok(Self::new(dim as usize, distance_type))
282 }
283
284 fn retrain(&mut self, _: &dyn Array) -> Result<()> {
285 Ok(())
286 }
287
288 fn code_dim(&self) -> usize {
289 self.dim
290 }
291
292 fn column(&self) -> &'static str {
293 FLAT_COLUMN
294 }
295
296 fn from_metadata(metadata: &Self::Metadata, distance_type: DistanceType) -> Result<Quantizer> {
297 Ok(Quantizer::FlatBin(Self {
298 dim: metadata.dim,
299 distance_type,
300 }))
301 }
302
303 fn metadata(&self, _: Option<crate::vector::quantizer::QuantizationMetadata>) -> FlatMetadata {
304 FlatMetadata { dim: self.dim }
305 }
306
307 fn metadata_key() -> &'static str {
308 "flat"
309 }
310
311 fn quantization_type() -> QuantizationType {
312 QuantizationType::Flat
313 }
314
315 fn quantize(&self, vectors: &dyn Array) -> Result<ArrayRef> {
316 Ok(vectors.slice(0, vectors.len()))
317 }
318
319 fn field(&self) -> Field {
320 Field::new(
321 FLAT_COLUMN,
322 DataType::FixedSizeList(
323 Arc::new(Field::new("item", DataType::UInt8, true)),
324 self.dim as i32,
325 ),
326 true,
327 )
328 }
329}
330
331impl From<FlatBinQuantizer> for Quantizer {
332 fn from(value: FlatBinQuantizer) -> Self {
333 Self::FlatBin(value)
334 }
335}
336
337impl TryFrom<Quantizer> for FlatBinQuantizer {
338 type Error = Error;
339
340 fn try_from(value: Quantizer) -> Result<Self> {
341 match value {
342 Quantizer::FlatBin(quantizer) => Ok(quantizer),
343 _ => Err(Error::invalid_input(
344 "quantizer is not FlatBinQuantizer",
345 location!(),
346 )),
347 }
348 }
349}