1use crate::vector::quantizer::QuantizerStorage;
7use arrow::compute::concat_batches;
8use arrow_array::{ArrayRef, RecordBatch};
9use arrow_schema::SchemaRef;
10use deepsize::DeepSizeOf;
11use futures::prelude::stream::TryStreamExt;
12use lance_arrow::RecordBatchExt;
13use lance_core::{Error, Result, ROW_ID};
14use lance_encoding::decoder::FilterExpression;
15use lance_file::reader::FileReader;
16use lance_io::ReadBatchParams;
17use lance_linalg::distance::DistanceType;
18use prost::Message;
19use snafu::location;
20use std::{any::Any, sync::Arc};
21
22use crate::frag_reuse::FragReuseIndex;
23use crate::{
24 pb,
25 vector::{
26 ivf::storage::{IvfModel, IVF_METADATA_KEY},
27 quantizer::Quantization,
28 },
29};
30
31use super::quantizer::{Quantizer, QuantizerMetadata};
32use super::DISTANCE_TYPE_KEY;
33
34pub trait DistCalculator {
40 fn distance(&self, id: u32) -> f32;
41
42 fn distance_all(&self, k_hint: usize) -> Vec<f32>;
45
46 fn prefetch(&self, _id: u32) {}
47}
48
49pub const STORAGE_METADATA_KEY: &str = "storage_metadata";
50
51pub trait VectorStore: Send + Sync + Sized + Clone {
65 type DistanceCalculator<'a>: DistCalculator
66 where
67 Self: 'a;
68
69 fn as_any(&self) -> &dyn Any;
70
71 fn schema(&self) -> &SchemaRef;
72
73 fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch> + Send>;
74
75 fn len(&self) -> usize;
76
77 fn is_empty(&self) -> bool {
79 self.len() == 0
80 }
81
82 fn distance_type(&self) -> DistanceType;
84
85 fn row_id(&self, id: u32) -> u64;
87
88 fn row_ids(&self) -> impl Iterator<Item = &u64>;
89
90 fn append_batch(&self, batch: RecordBatch, vector_column: &str) -> Result<Self>;
93
94 fn dist_calculator(&self, query: ArrayRef, dist_q_c: f32) -> Self::DistanceCalculator<'_>;
99
100 fn dist_calculator_from_id(&self, id: u32) -> Self::DistanceCalculator<'_>;
101
102 fn dist_between(&self, u: u32, v: u32) -> f32 {
103 let dist_cal_u = self.dist_calculator_from_id(u);
104 dist_cal_u.distance(v)
105 }
106}
107
108pub struct StorageBuilder<Q: Quantization> {
109 vector_column: String,
110 distance_type: DistanceType,
111 quantizer: Q,
112
113 frag_reuse_index: Option<Arc<FragReuseIndex>>,
114}
115
116impl<Q: Quantization> StorageBuilder<Q> {
117 pub fn new(
118 vector_column: String,
119 distance_type: DistanceType,
120 quantizer: Q,
121 frag_reuse_index: Option<Arc<FragReuseIndex>>,
122 ) -> Result<Self> {
123 Ok(Self {
124 vector_column,
125 distance_type,
126 quantizer,
127 frag_reuse_index,
128 })
129 }
130
131 pub fn build(&self, batches: Vec<RecordBatch>) -> Result<Q::Storage> {
132 let mut batch = concat_batches(batches[0].schema_ref(), batches.iter())?;
133
134 if batch.column_by_name(self.quantizer.column()).is_none() {
135 let vectors = batch
136 .column_by_name(&self.vector_column)
137 .ok_or(Error::Index {
138 message: format!("Vector column {} not found in batch", self.vector_column),
139 location: location!(),
140 })?;
141 let codes = self.quantizer.quantize(vectors)?;
142 batch = batch.drop_column(&self.vector_column)?.try_with_column(
143 arrow_schema::Field::new(self.quantizer.column(), codes.data_type().clone(), true),
144 codes,
145 )?;
146 }
147
148 debug_assert!(batch.column_by_name(ROW_ID).is_some());
149 debug_assert!(batch.column_by_name(self.quantizer.column()).is_some());
150
151 Q::Storage::try_from_batch(
152 batch,
153 &self.quantizer.metadata(None),
154 self.distance_type,
155 self.frag_reuse_index.clone(),
156 )
157 }
158}
159
160#[derive(Debug)]
162pub struct IvfQuantizationStorage<Q: Quantization> {
163 reader: FileReader,
164
165 distance_type: DistanceType,
166 metadata: Q::Metadata,
167
168 ivf: IvfModel,
169 frag_reuse_index: Option<Arc<FragReuseIndex>>,
170}
171
172impl<Q: Quantization> DeepSizeOf for IvfQuantizationStorage<Q> {
173 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
174 self.metadata.deep_size_of_children(context) + self.ivf.deep_size_of_children(context)
175 }
176}
177
178impl<Q: Quantization> IvfQuantizationStorage<Q> {
179 pub async fn try_new(
183 reader: FileReader,
184 frag_reuse_index: Option<Arc<FragReuseIndex>>,
185 ) -> Result<Self> {
186 let schema = reader.schema();
187
188 let distance_type = DistanceType::try_from(
189 schema
190 .metadata
191 .get(DISTANCE_TYPE_KEY)
192 .ok_or(Error::Index {
193 message: format!("{} not found", DISTANCE_TYPE_KEY),
194 location: location!(),
195 })?
196 .as_str(),
197 )?;
198
199 let ivf_pos = schema
200 .metadata
201 .get(IVF_METADATA_KEY)
202 .ok_or(Error::Index {
203 message: format!("{} not found", IVF_METADATA_KEY),
204 location: location!(),
205 })?
206 .parse()
207 .map_err(|e| Error::Index {
208 message: format!("Failed to decode IVF metadata: {}", e),
209 location: location!(),
210 })?;
211 let ivf_bytes = reader.read_global_buffer(ivf_pos).await?;
212 let ivf = IvfModel::try_from(pb::Ivf::decode(ivf_bytes)?)?;
213
214 let mut metadata: Vec<String> = serde_json::from_str(
215 schema
216 .metadata
217 .get(STORAGE_METADATA_KEY)
218 .ok_or(Error::Index {
219 message: format!("{} not found", STORAGE_METADATA_KEY),
220 location: location!(),
221 })?
222 .as_str(),
223 )?;
224 debug_assert_eq!(metadata.len(), 1);
225 let metadata = metadata.pop().ok_or(Error::Index {
227 message: "metadata is empty".to_string(),
228 location: location!(),
229 })?;
230 let mut metadata: Q::Metadata = serde_json::from_str(&metadata)?;
231 if let Some(pos) = metadata.buffer_index() {
234 let bytes = reader.read_global_buffer(pos).await?;
235 metadata.parse_buffer(bytes)?;
236 }
237
238 Ok(Self {
239 reader,
240 distance_type,
241 metadata,
242 ivf,
243 frag_reuse_index,
244 })
245 }
246
247 pub fn num_rows(&self) -> u64 {
248 self.reader.num_rows()
249 }
250
251 pub fn partition_size(&self, part_id: usize) -> usize {
252 self.ivf.partition_size(part_id)
253 }
254
255 pub fn quantizer(&self) -> Result<Quantizer> {
256 let metadata = self.metadata();
257 Q::from_metadata(metadata, self.distance_type)
258 }
259
260 pub fn metadata(&self) -> &Q::Metadata {
261 &self.metadata
262 }
263
264 pub fn distance_type(&self) -> DistanceType {
265 self.distance_type
266 }
267
268 pub fn schema(&self) -> SchemaRef {
269 Arc::new(self.reader.schema().as_ref().into())
270 }
271
272 pub fn num_partitions(&self) -> usize {
274 self.ivf.num_partitions()
275 }
276
277 pub async fn load_partition(&self, part_id: usize) -> Result<Q::Storage> {
278 let range = self.ivf.row_range(part_id);
279 let batch = if range.is_empty() {
280 let schema = self.reader.schema();
281 let arrow_schema = arrow_schema::Schema::from(schema.as_ref());
282 RecordBatch::new_empty(Arc::new(arrow_schema))
283 } else {
284 let batches = self
285 .reader
286 .read_stream(
287 ReadBatchParams::Range(range),
288 u32::MAX,
289 1,
290 FilterExpression::no_filter(),
291 )?
292 .try_collect::<Vec<_>>()
293 .await?;
294 let schema = Arc::new(self.reader.schema().as_ref().into());
295 concat_batches(&schema, batches.iter())?
296 };
297 Q::Storage::try_from_batch(
298 batch,
299 self.metadata(),
300 self.distance_type,
301 self.frag_reuse_index.clone(),
302 )
303 }
304}