1use std::{
5 any::Any,
6 collections::HashMap,
7 fmt::{Debug, Formatter},
8 sync::Arc,
9};
10
11use arrow_array::{Float32Array, RecordBatch, UInt32Array};
12use async_trait::async_trait;
13use datafusion::execution::SendableRecordBatchStream;
14use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
15use deepsize::DeepSizeOf;
16use lance_arrow::RecordBatchExt;
17use lance_core::ROW_ID;
18use lance_core::{datatypes::Schema, Error, Result};
19use lance_file::previous::reader::FileReader as PreviousFileReader;
20use lance_io::traits::Reader;
21use lance_linalg::distance::DistanceType;
22use lance_table::format::SelfDescribingFileReader;
23use roaring::RoaringBitmap;
24use serde_json::json;
25use snafu::location;
26use tracing::instrument;
27
28use crate::vector::ivf::storage::IvfModel;
29use crate::vector::quantizer::QuantizationType;
30use crate::vector::v3::subindex::{IvfSubIndex, SubIndexType};
31use crate::{metrics::MetricsCollector, prefilter::PreFilter};
32use crate::{
33 vector::{
34 graph::NEIGHBORS_FIELD,
35 hnsw::{HnswMetadata, HNSW, VECTOR_ID_FIELD},
36 ivf::storage::IVF_PARTITION_KEY,
37 quantizer::{IvfQuantizationStorage, Quantization, Quantizer},
38 storage::VectorStore,
39 Query, VectorIndex,
40 },
41 Index, IndexType,
42};
43
44#[derive(Clone, DeepSizeOf)]
45pub struct HNSWIndexOptions {
46 pub use_residual: bool,
47}
48
49#[derive(Clone, DeepSizeOf)]
50pub struct HNSWIndex<Q: Quantization> {
51 hnsw: Option<HNSW>,
53 storage: Option<Arc<Q::Storage>>,
54
55 partition_storage: IvfQuantizationStorage<Q>,
57 partition_metadata: Option<Vec<HnswMetadata>>,
58
59 options: HNSWIndexOptions,
60}
61
62impl<Q: Quantization> Debug for HNSWIndex<Q> {
63 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
64 self.hnsw.fmt(f)
65 }
66}
67
68impl<Q: Quantization> HNSWIndex<Q> {
69 pub async fn try_new(
70 reader: Arc<dyn Reader>,
71 aux_reader: Arc<dyn Reader>,
72 options: HNSWIndexOptions,
73 ) -> Result<Self> {
74 let reader =
75 PreviousFileReader::try_new_self_described_from_reader(reader.clone(), None).await?;
76
77 let partition_metadata = match reader.schema().metadata.get(IVF_PARTITION_KEY) {
78 Some(json) => {
79 let metadata: Vec<HnswMetadata> = serde_json::from_str(json)?;
80 Some(metadata)
81 }
82 None => None,
83 };
84
85 let ivf_store = IvfQuantizationStorage::open(aux_reader).await?;
86 Ok(Self {
87 hnsw: None,
88 storage: None,
89 partition_storage: ivf_store,
90 partition_metadata,
91 options,
92 })
93 }
94
95 pub fn quantizer(&self) -> &Quantizer {
96 self.partition_storage.quantizer()
97 }
98
99 pub fn metadata(&self) -> HnswMetadata {
100 self.partition_metadata.as_ref().unwrap()[0].clone()
101 }
102
103 fn get_partition_metadata(&self, partition_id: usize) -> Result<HnswMetadata> {
104 match self.partition_metadata {
105 Some(ref metadata) => Ok(metadata[partition_id].clone()),
106 None => Err(Error::Index {
107 message: "No partition metadata found".to_string(),
108 location: location!(),
109 }),
110 }
111 }
112}
113
114#[async_trait]
115impl<Q: Quantization + Send + Sync + 'static> Index for HNSWIndex<Q> {
116 fn as_any(&self) -> &dyn Any {
118 self
119 }
120
121 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
123 self
124 }
125
126 fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn VectorIndex>> {
128 Ok(self)
129 }
130
131 fn statistics(&self) -> Result<serde_json::Value> {
133 Ok(json!({
134 "index_type": "HNSW",
135 "distance_type": self.partition_storage.distance_type().to_string(),
136 }))
137 }
138
139 async fn prewarm(&self) -> Result<()> {
140 Ok(())
142 }
143
144 fn index_type(&self) -> IndexType {
146 IndexType::Vector
147 }
148
149 async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
154 unimplemented!()
155 }
156}
157
158#[async_trait]
159impl<Q: Quantization + Send + Sync + 'static> VectorIndex for HNSWIndex<Q> {
160 #[instrument(level = "debug", skip_all, name = "HNSWIndex::search")]
161 async fn search(
162 &self,
163 query: &Query,
164 pre_filter: Arc<dyn PreFilter>,
165 metrics: &dyn MetricsCollector,
166 ) -> Result<RecordBatch> {
167 let hnsw = self.hnsw.as_ref().ok_or(Error::Index {
168 message: "HNSW index not loaded".to_string(),
169 location: location!(),
170 })?;
171
172 let storage = self.storage.as_ref().ok_or(Error::Index {
173 message: "vector storage not loaded".to_string(),
174 location: location!(),
175 })?;
176
177 let refine_factor = query.refine_factor.unwrap_or(1) as usize;
178 let k = query.k * refine_factor;
179
180 hnsw.search(
181 query.key.clone(),
182 k,
183 query.into(),
184 storage.as_ref(),
185 pre_filter,
186 metrics,
187 )
188 }
189
190 fn find_partitions(&self, _: &Query) -> Result<(UInt32Array, Float32Array)> {
191 unimplemented!("only for IVF")
192 }
193
194 fn total_partitions(&self) -> usize {
195 1
196 }
197
198 async fn search_in_partition(
199 &self,
200 _: usize,
201 _: &Query,
202 _: Arc<dyn PreFilter>,
203 _: &dyn MetricsCollector,
204 ) -> Result<RecordBatch> {
205 unimplemented!("only for IVF")
206 }
207
208 fn is_loadable(&self) -> bool {
209 true
210 }
211
212 fn use_residual(&self) -> bool {
213 self.options.use_residual
214 }
215
216 async fn load(
217 &self,
218 reader: Arc<dyn Reader>,
219 _offset: usize,
220 _length: usize,
221 ) -> Result<Box<dyn VectorIndex>> {
222 let schema = Schema::try_from(&arrow_schema::Schema::new(vec![
223 NEIGHBORS_FIELD.clone(),
224 VECTOR_ID_FIELD.clone(),
225 ]))?;
226
227 let reader = PreviousFileReader::try_new_from_reader(
228 reader.path(),
229 reader.clone(),
230 None,
231 schema,
232 0,
233 0,
234 2,
235 None,
236 )
237 .await?;
238
239 let storage = Arc::new(self.partition_storage.load_partition(0).await?);
240 let batch = reader.read_range(0..reader.len(), reader.schema()).await?;
241 let hnsw = HNSW::load(batch)?;
242
243 Ok(Box::new(Self {
244 hnsw: Some(hnsw),
245 storage: Some(storage),
246 partition_storage: self.partition_storage.clone(),
247 partition_metadata: self.partition_metadata.clone(),
248 options: self.options.clone(),
249 }))
250 }
251
252 async fn load_partition(
253 &self,
254 reader: Arc<dyn Reader>,
255 offset: usize,
256 length: usize,
257 partition_id: usize,
258 ) -> Result<Box<dyn VectorIndex>> {
259 let reader = PreviousFileReader::try_new_self_described_from_reader(reader, None).await?;
260
261 let metadata = self.get_partition_metadata(partition_id)?;
262 let storage = Arc::new(self.partition_storage.load_partition(partition_id).await?);
263 let batch = reader
264 .read_range(offset..offset + length, reader.schema())
265 .await?;
266 let mut schema = batch.schema_ref().as_ref().clone();
267 schema.metadata.insert(
268 HNSW::metadata_key().to_owned(),
269 serde_json::to_string(&metadata)?,
270 );
271 let batch = batch.with_schema(schema.into())?;
272 let hnsw = HNSW::load(batch)?;
273
274 Ok(Box::new(Self {
275 hnsw: Some(hnsw),
276 storage: Some(storage),
277 partition_storage: self.partition_storage.clone(),
278 partition_metadata: self.partition_metadata.clone(),
279 options: self.options.clone(),
280 }))
281 }
282
283 async fn to_batch_stream(&self, with_vector: bool) -> Result<SendableRecordBatchStream> {
284 let store = self.storage.as_ref().ok_or(Error::Index {
285 message: "vector storage not loaded".to_string(),
286 location: location!(),
287 })?;
288
289 let schema = if with_vector {
290 store.schema().clone()
291 } else {
292 let schema = store.schema();
293 let row_id_idx = schema.index_of(ROW_ID)?;
294 Arc::new(schema.project(&[row_id_idx])?)
295 };
296
297 let batches = store
298 .to_batches()?
299 .map(|b| {
300 let batch = b.project_by_schema(&schema)?;
301 Ok(batch)
302 })
303 .collect::<Vec<_>>();
304 let stream = futures::stream::iter(batches);
305 let stream = RecordBatchStreamAdapter::new(schema, stream);
306 Ok(Box::pin(stream))
307 }
308
309 fn num_rows(&self) -> u64 {
310 self.hnsw
311 .as_ref()
312 .map_or(0, |hnsw| hnsw.num_nodes(0) as u64)
313 }
314
315 fn row_ids(&self) -> Box<dyn Iterator<Item = &'_ u64> + '_> {
316 Box::new(self.storage.as_ref().unwrap().row_ids())
317 }
318
319 async fn remap(&mut self, _mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
320 Err(Error::Index {
321 message: "Remapping HNSW in this way not supported".to_string(),
322 location: location!(),
323 })
324 }
325
326 fn ivf_model(&self) -> &IvfModel {
327 unimplemented!("only for IVF")
328 }
329
330 fn quantizer(&self) -> Quantizer {
331 self.partition_storage.quantizer().clone()
332 }
333
334 fn partition_size(&self, _: usize) -> usize {
335 unimplemented!("only for IVF")
336 }
337
338 fn sub_index_type(&self) -> (SubIndexType, QuantizationType) {
339 (
340 SubIndexType::Hnsw,
341 self.partition_storage.quantizer().quantization_type(),
342 )
343 }
344
345 fn metric_type(&self) -> DistanceType {
346 self.partition_storage.distance_type()
347 }
348}