Skip to main content

lance_index/vector/hnsw/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::HashMap,
7    fmt::{Debug, Formatter},
8    sync::Arc,
9};
10
11use arrow_array::{Float32Array, RecordBatch, UInt32Array};
12use async_trait::async_trait;
13use datafusion::execution::SendableRecordBatchStream;
14use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
15use deepsize::DeepSizeOf;
16use lance_arrow::RecordBatchExt;
17use lance_core::ROW_ID;
18use lance_core::{datatypes::Schema, Error, Result};
19use lance_file::previous::reader::FileReader as PreviousFileReader;
20use lance_io::traits::Reader;
21use lance_linalg::distance::DistanceType;
22use lance_table::format::SelfDescribingFileReader;
23use roaring::RoaringBitmap;
24use serde_json::json;
25use snafu::location;
26use tracing::instrument;
27
28use crate::vector::ivf::storage::IvfModel;
29use crate::vector::quantizer::QuantizationType;
30use crate::vector::v3::subindex::{IvfSubIndex, SubIndexType};
31use crate::{metrics::MetricsCollector, prefilter::PreFilter};
32use crate::{
33    vector::{
34        graph::NEIGHBORS_FIELD,
35        hnsw::{HnswMetadata, HNSW, VECTOR_ID_FIELD},
36        ivf::storage::IVF_PARTITION_KEY,
37        quantizer::{IvfQuantizationStorage, Quantization, Quantizer},
38        storage::VectorStore,
39        Query, VectorIndex,
40    },
41    Index, IndexType,
42};
43
44#[derive(Clone, DeepSizeOf)]
45pub struct HNSWIndexOptions {
46    pub use_residual: bool,
47}
48
49#[derive(Clone, DeepSizeOf)]
50pub struct HNSWIndex<Q: Quantization> {
51    // Some(T) if the index is loaded, None otherwise
52    hnsw: Option<HNSW>,
53    storage: Option<Arc<Q::Storage>>,
54
55    // TODO: move these into IVFIndex after the refactor is complete
56    partition_storage: IvfQuantizationStorage<Q>,
57    partition_metadata: Option<Vec<HnswMetadata>>,
58
59    options: HNSWIndexOptions,
60}
61
62impl<Q: Quantization> Debug for HNSWIndex<Q> {
63    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
64        self.hnsw.fmt(f)
65    }
66}
67
68impl<Q: Quantization> HNSWIndex<Q> {
69    pub async fn try_new(
70        reader: Arc<dyn Reader>,
71        aux_reader: Arc<dyn Reader>,
72        options: HNSWIndexOptions,
73    ) -> Result<Self> {
74        let reader =
75            PreviousFileReader::try_new_self_described_from_reader(reader.clone(), None).await?;
76
77        let partition_metadata = match reader.schema().metadata.get(IVF_PARTITION_KEY) {
78            Some(json) => {
79                let metadata: Vec<HnswMetadata> = serde_json::from_str(json)?;
80                Some(metadata)
81            }
82            None => None,
83        };
84
85        let ivf_store = IvfQuantizationStorage::open(aux_reader).await?;
86        Ok(Self {
87            hnsw: None,
88            storage: None,
89            partition_storage: ivf_store,
90            partition_metadata,
91            options,
92        })
93    }
94
95    pub fn quantizer(&self) -> &Quantizer {
96        self.partition_storage.quantizer()
97    }
98
99    pub fn metadata(&self) -> HnswMetadata {
100        self.partition_metadata.as_ref().unwrap()[0].clone()
101    }
102
103    fn get_partition_metadata(&self, partition_id: usize) -> Result<HnswMetadata> {
104        match self.partition_metadata {
105            Some(ref metadata) => Ok(metadata[partition_id].clone()),
106            None => Err(Error::Index {
107                message: "No partition metadata found".to_string(),
108                location: location!(),
109            }),
110        }
111    }
112}
113
114#[async_trait]
115impl<Q: Quantization + Send + Sync + 'static> Index for HNSWIndex<Q> {
116    /// Cast to [Any].
117    fn as_any(&self) -> &dyn Any {
118        self
119    }
120
121    /// Cast to [Index]
122    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
123        self
124    }
125
126    /// Cast to [VectorIndex]
127    fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn VectorIndex>> {
128        Ok(self)
129    }
130
131    /// Retrieve index statistics as a JSON Value
132    fn statistics(&self) -> Result<serde_json::Value> {
133        Ok(json!({
134            "index_type": "HNSW",
135            "distance_type": self.partition_storage.distance_type().to_string(),
136        }))
137    }
138
139    async fn prewarm(&self) -> Result<()> {
140        // TODO: HNSW can (and should) support pre-warming
141        Ok(())
142    }
143
144    /// Get the type of the index
145    fn index_type(&self) -> IndexType {
146        IndexType::Vector
147    }
148
149    /// Read through the index and determine which fragment ids are covered by the index
150    ///
151    /// This is a kind of slow operation.  It's better to use the fragment_bitmap.  This
152    /// only exists for cases where the fragment_bitmap has become corrupted or missing.
153    async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
154        unimplemented!()
155    }
156}
157
158#[async_trait]
159impl<Q: Quantization + Send + Sync + 'static> VectorIndex for HNSWIndex<Q> {
160    #[instrument(level = "debug", skip_all, name = "HNSWIndex::search")]
161    async fn search(
162        &self,
163        query: &Query,
164        pre_filter: Arc<dyn PreFilter>,
165        metrics: &dyn MetricsCollector,
166    ) -> Result<RecordBatch> {
167        let hnsw = self.hnsw.as_ref().ok_or(Error::Index {
168            message: "HNSW index not loaded".to_string(),
169            location: location!(),
170        })?;
171
172        let storage = self.storage.as_ref().ok_or(Error::Index {
173            message: "vector storage not loaded".to_string(),
174            location: location!(),
175        })?;
176
177        let refine_factor = query.refine_factor.unwrap_or(1) as usize;
178        let k = query.k * refine_factor;
179
180        hnsw.search(
181            query.key.clone(),
182            k,
183            query.into(),
184            storage.as_ref(),
185            pre_filter,
186            metrics,
187        )
188    }
189
190    fn find_partitions(&self, _: &Query) -> Result<(UInt32Array, Float32Array)> {
191        unimplemented!("only for IVF")
192    }
193
194    fn total_partitions(&self) -> usize {
195        1
196    }
197
198    async fn search_in_partition(
199        &self,
200        _: usize,
201        _: &Query,
202        _: Arc<dyn PreFilter>,
203        _: &dyn MetricsCollector,
204    ) -> Result<RecordBatch> {
205        unimplemented!("only for IVF")
206    }
207
208    fn is_loadable(&self) -> bool {
209        true
210    }
211
212    fn use_residual(&self) -> bool {
213        self.options.use_residual
214    }
215
216    async fn load(
217        &self,
218        reader: Arc<dyn Reader>,
219        _offset: usize,
220        _length: usize,
221    ) -> Result<Box<dyn VectorIndex>> {
222        let schema = Schema::try_from(&arrow_schema::Schema::new(vec![
223            NEIGHBORS_FIELD.clone(),
224            VECTOR_ID_FIELD.clone(),
225        ]))?;
226
227        let reader = PreviousFileReader::try_new_from_reader(
228            reader.path(),
229            reader.clone(),
230            None,
231            schema,
232            0,
233            0,
234            2,
235            None,
236        )
237        .await?;
238
239        let storage = Arc::new(self.partition_storage.load_partition(0).await?);
240        let batch = reader.read_range(0..reader.len(), reader.schema()).await?;
241        let hnsw = HNSW::load(batch)?;
242
243        Ok(Box::new(Self {
244            hnsw: Some(hnsw),
245            storage: Some(storage),
246            partition_storage: self.partition_storage.clone(),
247            partition_metadata: self.partition_metadata.clone(),
248            options: self.options.clone(),
249        }))
250    }
251
252    async fn load_partition(
253        &self,
254        reader: Arc<dyn Reader>,
255        offset: usize,
256        length: usize,
257        partition_id: usize,
258    ) -> Result<Box<dyn VectorIndex>> {
259        let reader = PreviousFileReader::try_new_self_described_from_reader(reader, None).await?;
260
261        let metadata = self.get_partition_metadata(partition_id)?;
262        let storage = Arc::new(self.partition_storage.load_partition(partition_id).await?);
263        let batch = reader
264            .read_range(offset..offset + length, reader.schema())
265            .await?;
266        let mut schema = batch.schema_ref().as_ref().clone();
267        schema.metadata.insert(
268            HNSW::metadata_key().to_owned(),
269            serde_json::to_string(&metadata)?,
270        );
271        let batch = batch.with_schema(schema.into())?;
272        let hnsw = HNSW::load(batch)?;
273
274        Ok(Box::new(Self {
275            hnsw: Some(hnsw),
276            storage: Some(storage),
277            partition_storage: self.partition_storage.clone(),
278            partition_metadata: self.partition_metadata.clone(),
279            options: self.options.clone(),
280        }))
281    }
282
283    async fn to_batch_stream(&self, with_vector: bool) -> Result<SendableRecordBatchStream> {
284        let store = self.storage.as_ref().ok_or(Error::Index {
285            message: "vector storage not loaded".to_string(),
286            location: location!(),
287        })?;
288
289        let schema = if with_vector {
290            store.schema().clone()
291        } else {
292            let schema = store.schema();
293            let row_id_idx = schema.index_of(ROW_ID)?;
294            Arc::new(schema.project(&[row_id_idx])?)
295        };
296
297        let batches = store
298            .to_batches()?
299            .map(|b| {
300                let batch = b.project_by_schema(&schema)?;
301                Ok(batch)
302            })
303            .collect::<Vec<_>>();
304        let stream = futures::stream::iter(batches);
305        let stream = RecordBatchStreamAdapter::new(schema, stream);
306        Ok(Box::pin(stream))
307    }
308
309    fn num_rows(&self) -> u64 {
310        self.hnsw
311            .as_ref()
312            .map_or(0, |hnsw| hnsw.num_nodes(0) as u64)
313    }
314
315    fn row_ids(&self) -> Box<dyn Iterator<Item = &'_ u64> + '_> {
316        Box::new(self.storage.as_ref().unwrap().row_ids())
317    }
318
319    async fn remap(&mut self, _mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
320        Err(Error::Index {
321            message: "Remapping HNSW in this way not supported".to_string(),
322            location: location!(),
323        })
324    }
325
326    fn ivf_model(&self) -> &IvfModel {
327        unimplemented!("only for IVF")
328    }
329
330    fn quantizer(&self) -> Quantizer {
331        self.partition_storage.quantizer().clone()
332    }
333
334    fn partition_size(&self, _: usize) -> usize {
335        unimplemented!("only for IVF")
336    }
337
338    fn sub_index_type(&self) -> (SubIndexType, QuantizationType) {
339        (
340            SubIndexType::Hnsw,
341            self.partition_storage.quantizer().quantization_type(),
342        )
343    }
344
345    fn metric_type(&self) -> DistanceType {
346        self.partition_storage.distance_type()
347    }
348}