1use std::{
5 any::Any,
6 collections::HashMap,
7 fmt::{Debug, Formatter},
8 sync::Arc,
9};
10
11use arrow_array::{Float32Array, RecordBatch, UInt32Array};
12use async_trait::async_trait;
13use datafusion::execution::SendableRecordBatchStream;
14use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
15use deepsize::DeepSizeOf;
16use lance_arrow::RecordBatchExt;
17use lance_core::ROW_ID;
18use lance_core::{Error, Result, datatypes::Schema};
19use lance_file::previous::reader::FileReader as PreviousFileReader;
20use lance_io::traits::Reader;
21use lance_linalg::distance::DistanceType;
22use lance_table::format::SelfDescribingFileReader;
23use roaring::RoaringBitmap;
24use serde_json::json;
25use tracing::instrument;
26
27use crate::vector::ivf::storage::IvfModel;
28use crate::vector::quantizer::QuantizationType;
29use crate::vector::v3::subindex::{IvfSubIndex, SubIndexType};
30use crate::{
31 Index, IndexType,
32 vector::{
33 Query, VectorIndex,
34 graph::NEIGHBORS_FIELD,
35 hnsw::{HNSW, HnswMetadata, VECTOR_ID_FIELD},
36 ivf::storage::IVF_PARTITION_KEY,
37 quantizer::{IvfQuantizationStorage, Quantization, Quantizer},
38 storage::VectorStore,
39 },
40};
41use crate::{metrics::MetricsCollector, prefilter::PreFilter};
42
43#[derive(Clone, DeepSizeOf)]
44pub struct HNSWIndexOptions {
45 pub use_residual: bool,
46}
47
48#[derive(Clone, DeepSizeOf)]
49pub struct HNSWIndex<Q: Quantization> {
50 hnsw: Option<HNSW>,
52 storage: Option<Arc<Q::Storage>>,
53
54 partition_storage: IvfQuantizationStorage<Q>,
56 partition_metadata: Option<Vec<HnswMetadata>>,
57
58 options: HNSWIndexOptions,
59}
60
61impl<Q: Quantization> Debug for HNSWIndex<Q> {
62 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63 self.hnsw.fmt(f)
64 }
65}
66
67impl<Q: Quantization> HNSWIndex<Q> {
68 pub async fn try_new(
69 reader: Arc<dyn Reader>,
70 aux_reader: Arc<dyn Reader>,
71 options: HNSWIndexOptions,
72 ) -> Result<Self> {
73 let reader =
74 PreviousFileReader::try_new_self_described_from_reader(reader.clone(), None).await?;
75
76 let partition_metadata = match reader.schema().metadata.get(IVF_PARTITION_KEY) {
77 Some(json) => {
78 let metadata: Vec<HnswMetadata> = serde_json::from_str(json)?;
79 Some(metadata)
80 }
81 None => None,
82 };
83
84 let ivf_store = IvfQuantizationStorage::open(aux_reader).await?;
85 Ok(Self {
86 hnsw: None,
87 storage: None,
88 partition_storage: ivf_store,
89 partition_metadata,
90 options,
91 })
92 }
93
94 pub fn quantizer(&self) -> &Quantizer {
95 self.partition_storage.quantizer()
96 }
97
98 pub fn metadata(&self) -> HnswMetadata {
99 self.partition_metadata.as_ref().unwrap()[0].clone()
100 }
101
102 fn get_partition_metadata(&self, partition_id: usize) -> Result<HnswMetadata> {
103 match self.partition_metadata {
104 Some(ref metadata) => Ok(metadata[partition_id].clone()),
105 None => Err(Error::index("No partition metadata found".to_string())),
106 }
107 }
108}
109
110#[async_trait]
111impl<Q: Quantization + Send + Sync + 'static> Index for HNSWIndex<Q> {
112 fn as_any(&self) -> &dyn Any {
114 self
115 }
116
117 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
119 self
120 }
121
122 fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn VectorIndex>> {
124 Ok(self)
125 }
126
127 fn statistics(&self) -> Result<serde_json::Value> {
129 Ok(json!({
130 "index_type": "HNSW",
131 "distance_type": self.partition_storage.distance_type().to_string(),
132 }))
133 }
134
135 async fn prewarm(&self) -> Result<()> {
136 Ok(())
138 }
139
140 fn index_type(&self) -> IndexType {
142 IndexType::Vector
143 }
144
145 async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
150 unimplemented!()
151 }
152}
153
154#[async_trait]
155impl<Q: Quantization + Send + Sync + 'static> VectorIndex for HNSWIndex<Q> {
156 #[instrument(level = "debug", skip_all, name = "HNSWIndex::search")]
157 async fn search(
158 &self,
159 query: &Query,
160 pre_filter: Arc<dyn PreFilter>,
161 metrics: &dyn MetricsCollector,
162 ) -> Result<RecordBatch> {
163 let hnsw = self
164 .hnsw
165 .as_ref()
166 .ok_or(Error::index("HNSW index not loaded".to_string()))?;
167
168 let storage = self
169 .storage
170 .as_ref()
171 .ok_or(Error::index("vector storage not loaded".to_string()))?;
172
173 let refine_factor = query.refine_factor.unwrap_or(1) as usize;
174 let k = query.k * refine_factor;
175
176 hnsw.search(
177 query.key.clone(),
178 k,
179 query.into(),
180 storage.as_ref(),
181 pre_filter,
182 metrics,
183 )
184 }
185
186 fn find_partitions(&self, _: &Query) -> Result<(UInt32Array, Float32Array)> {
187 unimplemented!("only for IVF")
188 }
189
190 fn total_partitions(&self) -> usize {
191 1
192 }
193
194 async fn search_in_partition(
195 &self,
196 _: usize,
197 _: &Query,
198 _: Arc<dyn PreFilter>,
199 _: &dyn MetricsCollector,
200 ) -> Result<RecordBatch> {
201 unimplemented!("only for IVF")
202 }
203
204 fn is_loadable(&self) -> bool {
205 true
206 }
207
208 fn use_residual(&self) -> bool {
209 self.options.use_residual
210 }
211
212 async fn load(
213 &self,
214 reader: Arc<dyn Reader>,
215 _offset: usize,
216 _length: usize,
217 ) -> Result<Box<dyn VectorIndex>> {
218 let schema = Schema::try_from(&arrow_schema::Schema::new(vec![
219 NEIGHBORS_FIELD.clone(),
220 VECTOR_ID_FIELD.clone(),
221 ]))?;
222
223 let reader = PreviousFileReader::try_new_from_reader(
224 reader.path(),
225 reader.clone(),
226 None,
227 schema,
228 0,
229 0,
230 2,
231 None,
232 )
233 .await?;
234
235 let storage = Arc::new(self.partition_storage.load_partition(0).await?);
236 let batch = reader.read_range(0..reader.len(), reader.schema()).await?;
237 let hnsw = HNSW::load(batch)?;
238
239 Ok(Box::new(Self {
240 hnsw: Some(hnsw),
241 storage: Some(storage),
242 partition_storage: self.partition_storage.clone(),
243 partition_metadata: self.partition_metadata.clone(),
244 options: self.options.clone(),
245 }))
246 }
247
248 async fn load_partition(
249 &self,
250 reader: Arc<dyn Reader>,
251 offset: usize,
252 length: usize,
253 partition_id: usize,
254 ) -> Result<Box<dyn VectorIndex>> {
255 let reader = PreviousFileReader::try_new_self_described_from_reader(reader, None).await?;
256
257 let metadata = self.get_partition_metadata(partition_id)?;
258 let storage = Arc::new(self.partition_storage.load_partition(partition_id).await?);
259 let batch = reader
260 .read_range(offset..offset + length, reader.schema())
261 .await?;
262 let mut schema = batch.schema_ref().as_ref().clone();
263 schema.metadata.insert(
264 HNSW::metadata_key().to_owned(),
265 serde_json::to_string(&metadata)?,
266 );
267 let batch = batch.with_schema(schema.into())?;
268 let hnsw = HNSW::load(batch)?;
269
270 Ok(Box::new(Self {
271 hnsw: Some(hnsw),
272 storage: Some(storage),
273 partition_storage: self.partition_storage.clone(),
274 partition_metadata: self.partition_metadata.clone(),
275 options: self.options.clone(),
276 }))
277 }
278
279 async fn to_batch_stream(&self, with_vector: bool) -> Result<SendableRecordBatchStream> {
280 let store = self
281 .storage
282 .as_ref()
283 .ok_or(Error::index("vector storage not loaded".to_string()))?;
284
285 let schema = if with_vector {
286 store.schema().clone()
287 } else {
288 let schema = store.schema();
289 let row_id_idx = schema.index_of(ROW_ID)?;
290 Arc::new(schema.project(&[row_id_idx])?)
291 };
292
293 let batches = store
294 .to_batches()?
295 .map(|b| {
296 let batch = b.project_by_schema(&schema)?;
297 Ok(batch)
298 })
299 .collect::<Vec<_>>();
300 let stream = futures::stream::iter(batches);
301 let stream = RecordBatchStreamAdapter::new(schema, stream);
302 Ok(Box::pin(stream))
303 }
304
305 fn num_rows(&self) -> u64 {
306 self.hnsw
307 .as_ref()
308 .map_or(0, |hnsw| hnsw.num_nodes(0) as u64)
309 }
310
311 fn row_ids(&self) -> Box<dyn Iterator<Item = &'_ u64> + '_> {
312 Box::new(self.storage.as_ref().unwrap().row_ids())
313 }
314
315 async fn remap(&mut self, _mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
316 Err(Error::index(
317 "Remapping HNSW in this way not supported".to_string(),
318 ))
319 }
320
321 fn ivf_model(&self) -> &IvfModel {
322 unimplemented!("only for IVF")
323 }
324
325 fn quantizer(&self) -> Quantizer {
326 self.partition_storage.quantizer().clone()
327 }
328
329 fn partition_size(&self, _: usize) -> usize {
330 unimplemented!("only for IVF")
331 }
332
333 fn sub_index_type(&self) -> (SubIndexType, QuantizationType) {
334 (
335 SubIndexType::Hnsw,
336 self.partition_storage.quantizer().quantization_type(),
337 )
338 }
339
340 fn metric_type(&self) -> DistanceType {
341 self.partition_storage.distance_type()
342 }
343}