1use crate::vector::quantizer::QuantizerStorage;
7use arrow::compute::concat_batches;
8use arrow_array::{ArrayRef, RecordBatch};
9use arrow_schema::SchemaRef;
10use deepsize::DeepSizeOf;
11use futures::prelude::stream::TryStreamExt;
12use lance_arrow::RecordBatchExt;
13use lance_core::{Error, ROW_ID, Result};
14use lance_encoding::decoder::FilterExpression;
15use lance_file::reader::FileReader;
16use lance_io::ReadBatchParams;
17use lance_linalg::distance::DistanceType;
18use prost::Message;
19use std::{any::Any, sync::Arc};
20
21use crate::frag_reuse::FragReuseIndex;
22use crate::{
23 pb,
24 vector::{
25 ivf::storage::{IVF_METADATA_KEY, IvfModel},
26 quantizer::Quantization,
27 },
28};
29
30use super::DISTANCE_TYPE_KEY;
31use super::graph::OrderedFloat;
32use super::graph::OrderedNode;
33use super::quantizer::{Quantizer, QuantizerMetadata};
34
35pub trait DistCalculator {
41 fn distance(&self, id: u32) -> f32;
42
43 fn distance_all(&self, k_hint: usize) -> Vec<f32>;
46
47 fn distance_all_with_scratch(
49 &self,
50 k_hint: usize,
51 dists: &mut Vec<f32>,
52 _u16_scratch: &mut Vec<u16>,
53 _u8_scratch: &mut Vec<u8>,
54 ) {
55 dists.clear();
56 dists.extend(self.distance_all(k_hint));
57 }
58
59 fn prefetch(&self, _id: u32) {}
60}
61
62pub const STORAGE_METADATA_KEY: &str = "storage_metadata";
63
64pub trait VectorStore: Send + Sync + Sized + Clone {
78 type DistanceCalculator<'a>: DistCalculator
79 where
80 Self: 'a;
81
82 fn as_any(&self) -> &dyn Any;
83
84 fn schema(&self) -> &SchemaRef;
85
86 fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch> + Send>;
87
88 fn len(&self) -> usize;
89
90 fn is_empty(&self) -> bool {
92 self.len() == 0
93 }
94
95 fn distance_type(&self) -> DistanceType;
97
98 fn row_id(&self, id: u32) -> u64;
100
101 fn row_ids(&self) -> impl Iterator<Item = &u64>;
102
103 fn append_batch(&self, batch: RecordBatch, vector_column: &str) -> Result<Self>;
106
107 fn dist_calculator(&self, query: ArrayRef, dist_q_c: f32) -> Self::DistanceCalculator<'_>;
112
113 fn dist_calculator_from_id(&self, id: u32) -> Self::DistanceCalculator<'_>;
114
115 fn dist_between(&self, u: u32, v: u32) -> f32 {
116 let dist_cal_u = self.dist_calculator_from_id(u);
117 dist_cal_u.distance(v)
118 }
119
120 fn prefers_candidate(&self, candidate: &OrderedNode, selected: &[OrderedNode]) -> bool {
121 let dist_cal_candidate = self.dist_calculator_from_id(candidate.id);
122 selected
123 .iter()
124 .all(|other| candidate.dist < OrderedFloat(dist_cal_candidate.distance(other.id)))
125 }
126}
127
128pub struct StorageBuilder<Q: Quantization> {
129 vector_column: String,
130 distance_type: DistanceType,
131 quantizer: Q,
132
133 frag_reuse_index: Option<Arc<FragReuseIndex>>,
134}
135
136impl<Q: Quantization> StorageBuilder<Q> {
137 pub fn new(
138 vector_column: String,
139 distance_type: DistanceType,
140 quantizer: Q,
141 frag_reuse_index: Option<Arc<FragReuseIndex>>,
142 ) -> Result<Self> {
143 Ok(Self {
144 vector_column,
145 distance_type,
146 quantizer,
147 frag_reuse_index,
148 })
149 }
150
151 pub fn build(&self, batches: Vec<RecordBatch>) -> Result<Q::Storage> {
152 let mut batch = concat_batches(batches[0].schema_ref(), batches.iter())?;
153
154 if batch.column_by_name(self.quantizer.column()).is_none() {
155 let vectors = batch
156 .column_by_name(&self.vector_column)
157 .ok_or(Error::index(format!(
158 "Vector column {} not found in batch",
159 self.vector_column
160 )))?;
161 let codes = self.quantizer.quantize(vectors)?;
162 batch = batch.drop_column(&self.vector_column)?.try_with_column(
163 arrow_schema::Field::new(self.quantizer.column(), codes.data_type().clone(), true),
164 codes,
165 )?;
166 }
167
168 debug_assert!(batch.column_by_name(ROW_ID).is_some());
169 debug_assert!(batch.column_by_name(self.quantizer.column()).is_some());
170
171 Q::Storage::try_from_batch(
172 batch,
173 &self.quantizer.metadata(None),
174 self.distance_type,
175 self.frag_reuse_index.clone(),
176 )
177 }
178}
179
180#[derive(Debug)]
182pub struct IvfQuantizationStorage<Q: Quantization> {
183 reader: FileReader,
184
185 distance_type: DistanceType,
186 metadata: Q::Metadata,
187
188 ivf: IvfModel,
189 frag_reuse_index: Option<Arc<FragReuseIndex>>,
190}
191
192impl<Q: Quantization> DeepSizeOf for IvfQuantizationStorage<Q> {
193 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
194 self.metadata.deep_size_of_children(context) + self.ivf.deep_size_of_children(context)
195 }
196}
197
198impl<Q: Quantization> IvfQuantizationStorage<Q> {
199 pub async fn try_new(
203 reader: FileReader,
204 frag_reuse_index: Option<Arc<FragReuseIndex>>,
205 ) -> Result<Self> {
206 let schema = reader.schema();
207
208 let distance_type = DistanceType::try_from(
209 schema
210 .metadata
211 .get(DISTANCE_TYPE_KEY)
212 .ok_or(Error::index(format!("{} not found", DISTANCE_TYPE_KEY)))?
213 .as_str(),
214 )?;
215
216 let ivf_pos = schema
217 .metadata
218 .get(IVF_METADATA_KEY)
219 .ok_or(Error::index(format!("{} not found", IVF_METADATA_KEY)))?
220 .parse()
221 .map_err(|e| Error::index(format!("Failed to decode IVF metadata: {}", e)))?;
222 let ivf_bytes = reader.read_global_buffer(ivf_pos).await?;
223 let ivf = IvfModel::try_from(pb::Ivf::decode(ivf_bytes)?)?;
224
225 let mut metadata: Vec<String> = serde_json::from_str(
226 schema
227 .metadata
228 .get(STORAGE_METADATA_KEY)
229 .ok_or(Error::index(format!("{} not found", STORAGE_METADATA_KEY)))?
230 .as_str(),
231 )?;
232 debug_assert_eq!(metadata.len(), 1);
233 let metadata = metadata
235 .pop()
236 .ok_or(Error::index("metadata is empty".to_string()))?;
237 let mut metadata: Q::Metadata = serde_json::from_str(&metadata)?;
238 if let Some(pos) = metadata.buffer_index() {
241 let bytes = reader.read_global_buffer(pos).await?;
242 metadata.parse_buffer(bytes)?;
243 }
244
245 Ok(Self {
246 reader,
247 distance_type,
248 metadata,
249 ivf,
250 frag_reuse_index,
251 })
252 }
253
254 pub fn from_cached(
257 reader: FileReader,
258 ivf: IvfModel,
259 metadata: Q::Metadata,
260 distance_type: DistanceType,
261 frag_reuse_index: Option<Arc<FragReuseIndex>>,
262 ) -> Self {
263 Self {
264 reader,
265 distance_type,
266 metadata,
267 ivf,
268 frag_reuse_index,
269 }
270 }
271
272 pub fn reader(&self) -> &FileReader {
273 &self.reader
274 }
275
276 pub fn ivf(&self) -> &IvfModel {
277 &self.ivf
278 }
279
280 pub fn num_rows(&self) -> u64 {
281 self.reader.num_rows()
282 }
283
284 pub fn partition_size(&self, part_id: usize) -> usize {
285 self.ivf.partition_size(part_id)
286 }
287
288 pub fn quantizer(&self) -> Result<Quantizer> {
289 let metadata = self.metadata();
290 Q::from_metadata(metadata, self.distance_type)
291 }
292
293 pub fn metadata(&self) -> &Q::Metadata {
294 &self.metadata
295 }
296
297 pub fn distance_type(&self) -> DistanceType {
298 self.distance_type
299 }
300
301 pub fn schema(&self) -> SchemaRef {
302 Arc::new(self.reader.schema().as_ref().into())
303 }
304
305 pub fn num_partitions(&self) -> usize {
307 self.ivf.num_partitions()
308 }
309
310 pub async fn load_partition(&self, part_id: usize) -> Result<Q::Storage> {
311 let range = self.ivf.row_range(part_id);
312 let batch = if range.is_empty() {
313 let schema = self.reader.schema();
314 let arrow_schema = arrow_schema::Schema::from(schema.as_ref());
315 RecordBatch::new_empty(Arc::new(arrow_schema))
316 } else {
317 let batches = self
318 .reader
319 .read_stream(
320 ReadBatchParams::Range(range),
321 u32::MAX,
322 1,
323 FilterExpression::no_filter(),
324 )
325 .await?
326 .try_collect::<Vec<_>>()
327 .await?;
328 let schema = Arc::new(self.reader.schema().as_ref().into());
329 concat_batches(&schema, batches.iter())?
330 };
331 Q::Storage::try_from_batch(
332 batch,
333 self.metadata(),
334 self.distance_type,
335 self.frag_reuse_index.clone(),
336 )
337 }
338}