Skip to main content

lance_index/vector/hnsw/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::HashMap,
7    fmt::{Debug, Formatter},
8    sync::Arc,
9};
10
11use arrow_array::{Float32Array, RecordBatch, UInt32Array};
12use async_trait::async_trait;
13use datafusion::execution::SendableRecordBatchStream;
14use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
15use deepsize::DeepSizeOf;
16use lance_arrow::RecordBatchExt;
17use lance_core::ROW_ID;
18use lance_core::{Error, Result, datatypes::Schema};
19use lance_file::previous::reader::FileReader as PreviousFileReader;
20use lance_io::traits::Reader;
21use lance_linalg::distance::DistanceType;
22use lance_table::format::SelfDescribingFileReader;
23use roaring::RoaringBitmap;
24use serde_json::json;
25use tracing::instrument;
26
27use crate::vector::ivf::storage::IvfModel;
28use crate::vector::quantizer::QuantizationType;
29use crate::vector::v3::subindex::{IvfSubIndex, SubIndexType};
30use crate::{
31    Index, IndexType,
32    vector::{
33        Query, VectorIndex,
34        graph::NEIGHBORS_FIELD,
35        hnsw::{HNSW, HnswMetadata, VECTOR_ID_FIELD},
36        ivf::storage::IVF_PARTITION_KEY,
37        quantizer::{IvfQuantizationStorage, Quantization, Quantizer},
38        storage::VectorStore,
39    },
40};
41use crate::{metrics::MetricsCollector, prefilter::PreFilter};
42
43#[derive(Clone, DeepSizeOf)]
44pub struct HNSWIndexOptions {
45    pub use_residual: bool,
46}
47
48#[derive(Clone, DeepSizeOf)]
49pub struct HNSWIndex<Q: Quantization> {
50    // Some(T) if the index is loaded, None otherwise
51    hnsw: Option<HNSW>,
52    storage: Option<Arc<Q::Storage>>,
53
54    // TODO: move these into IVFIndex after the refactor is complete
55    partition_storage: IvfQuantizationStorage<Q>,
56    partition_metadata: Option<Vec<HnswMetadata>>,
57
58    options: HNSWIndexOptions,
59}
60
61impl<Q: Quantization> Debug for HNSWIndex<Q> {
62    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63        self.hnsw.fmt(f)
64    }
65}
66
67impl<Q: Quantization> HNSWIndex<Q> {
68    pub async fn try_new(
69        reader: Arc<dyn Reader>,
70        aux_reader: Arc<dyn Reader>,
71        options: HNSWIndexOptions,
72    ) -> Result<Self> {
73        let reader =
74            PreviousFileReader::try_new_self_described_from_reader(reader.clone(), None).await?;
75
76        let partition_metadata = match reader.schema().metadata.get(IVF_PARTITION_KEY) {
77            Some(json) => {
78                let metadata: Vec<HnswMetadata> = serde_json::from_str(json)?;
79                Some(metadata)
80            }
81            None => None,
82        };
83
84        let ivf_store = IvfQuantizationStorage::open(aux_reader).await?;
85        Ok(Self {
86            hnsw: None,
87            storage: None,
88            partition_storage: ivf_store,
89            partition_metadata,
90            options,
91        })
92    }
93
94    pub fn quantizer(&self) -> &Quantizer {
95        self.partition_storage.quantizer()
96    }
97
98    pub fn metadata(&self) -> HnswMetadata {
99        self.partition_metadata.as_ref().unwrap()[0].clone()
100    }
101
102    fn get_partition_metadata(&self, partition_id: usize) -> Result<HnswMetadata> {
103        match self.partition_metadata {
104            Some(ref metadata) => Ok(metadata[partition_id].clone()),
105            None => Err(Error::index("No partition metadata found".to_string())),
106        }
107    }
108}
109
110#[async_trait]
111impl<Q: Quantization + Send + Sync + 'static> Index for HNSWIndex<Q> {
112    /// Cast to [Any].
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116
117    /// Cast to [Index]
118    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
119        self
120    }
121
122    /// Cast to [VectorIndex]
123    fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn VectorIndex>> {
124        Ok(self)
125    }
126
127    /// Retrieve index statistics as a JSON Value
128    fn statistics(&self) -> Result<serde_json::Value> {
129        Ok(json!({
130            "index_type": "HNSW",
131            "distance_type": self.partition_storage.distance_type().to_string(),
132        }))
133    }
134
135    async fn prewarm(&self) -> Result<()> {
136        // TODO: HNSW can (and should) support pre-warming
137        Ok(())
138    }
139
140    /// Get the type of the index
141    fn index_type(&self) -> IndexType {
142        IndexType::Vector
143    }
144
145    /// Read through the index and determine which fragment ids are covered by the index
146    ///
147    /// This is a kind of slow operation.  It's better to use the fragment_bitmap.  This
148    /// only exists for cases where the fragment_bitmap has become corrupted or missing.
149    async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
150        unimplemented!()
151    }
152}
153
154#[async_trait]
155impl<Q: Quantization + Send + Sync + 'static> VectorIndex for HNSWIndex<Q> {
156    #[instrument(level = "debug", skip_all, name = "HNSWIndex::search")]
157    async fn search(
158        &self,
159        query: &Query,
160        pre_filter: Arc<dyn PreFilter>,
161        metrics: &dyn MetricsCollector,
162    ) -> Result<RecordBatch> {
163        let hnsw = self
164            .hnsw
165            .as_ref()
166            .ok_or(Error::index("HNSW index not loaded".to_string()))?;
167
168        let storage = self
169            .storage
170            .as_ref()
171            .ok_or(Error::index("vector storage not loaded".to_string()))?;
172
173        let refine_factor = query.refine_factor.unwrap_or(1) as usize;
174        let k = query.k * refine_factor;
175
176        hnsw.search(
177            query.key.clone(),
178            k,
179            query.into(),
180            storage.as_ref(),
181            pre_filter,
182            metrics,
183        )
184    }
185
186    fn find_partitions(&self, _: &Query) -> Result<(UInt32Array, Float32Array)> {
187        unimplemented!("only for IVF")
188    }
189
190    fn total_partitions(&self) -> usize {
191        1
192    }
193
194    async fn search_in_partition(
195        &self,
196        _: usize,
197        _: &Query,
198        _: Arc<dyn PreFilter>,
199        _: &dyn MetricsCollector,
200    ) -> Result<RecordBatch> {
201        unimplemented!("only for IVF")
202    }
203
204    fn is_loadable(&self) -> bool {
205        true
206    }
207
208    fn use_residual(&self) -> bool {
209        self.options.use_residual
210    }
211
212    async fn load(
213        &self,
214        reader: Arc<dyn Reader>,
215        _offset: usize,
216        _length: usize,
217    ) -> Result<Box<dyn VectorIndex>> {
218        let schema = Schema::try_from(&arrow_schema::Schema::new(vec![
219            NEIGHBORS_FIELD.clone(),
220            VECTOR_ID_FIELD.clone(),
221        ]))?;
222
223        let reader = PreviousFileReader::try_new_from_reader(
224            reader.path(),
225            reader.clone(),
226            None,
227            schema,
228            0,
229            0,
230            2,
231            None,
232        )
233        .await?;
234
235        let storage = Arc::new(self.partition_storage.load_partition(0).await?);
236        let batch = reader.read_range(0..reader.len(), reader.schema()).await?;
237        let hnsw = HNSW::load(batch)?;
238
239        Ok(Box::new(Self {
240            hnsw: Some(hnsw),
241            storage: Some(storage),
242            partition_storage: self.partition_storage.clone(),
243            partition_metadata: self.partition_metadata.clone(),
244            options: self.options.clone(),
245        }))
246    }
247
248    async fn load_partition(
249        &self,
250        reader: Arc<dyn Reader>,
251        offset: usize,
252        length: usize,
253        partition_id: usize,
254    ) -> Result<Box<dyn VectorIndex>> {
255        let reader = PreviousFileReader::try_new_self_described_from_reader(reader, None).await?;
256
257        let metadata = self.get_partition_metadata(partition_id)?;
258        let storage = Arc::new(self.partition_storage.load_partition(partition_id).await?);
259        let batch = reader
260            .read_range(offset..offset + length, reader.schema())
261            .await?;
262        let mut schema = batch.schema_ref().as_ref().clone();
263        schema.metadata.insert(
264            HNSW::metadata_key().to_owned(),
265            serde_json::to_string(&metadata)?,
266        );
267        let batch = batch.with_schema(schema.into())?;
268        let hnsw = HNSW::load(batch)?;
269
270        Ok(Box::new(Self {
271            hnsw: Some(hnsw),
272            storage: Some(storage),
273            partition_storage: self.partition_storage.clone(),
274            partition_metadata: self.partition_metadata.clone(),
275            options: self.options.clone(),
276        }))
277    }
278
279    async fn to_batch_stream(&self, with_vector: bool) -> Result<SendableRecordBatchStream> {
280        let store = self
281            .storage
282            .as_ref()
283            .ok_or(Error::index("vector storage not loaded".to_string()))?;
284
285        let schema = if with_vector {
286            store.schema().clone()
287        } else {
288            let schema = store.schema();
289            let row_id_idx = schema.index_of(ROW_ID)?;
290            Arc::new(schema.project(&[row_id_idx])?)
291        };
292
293        let batches = store
294            .to_batches()?
295            .map(|b| {
296                let batch = b.project_by_schema(&schema)?;
297                Ok(batch)
298            })
299            .collect::<Vec<_>>();
300        let stream = futures::stream::iter(batches);
301        let stream = RecordBatchStreamAdapter::new(schema, stream);
302        Ok(Box::pin(stream))
303    }
304
305    fn num_rows(&self) -> u64 {
306        self.hnsw
307            .as_ref()
308            .map_or(0, |hnsw| hnsw.num_nodes(0) as u64)
309    }
310
311    fn row_ids(&self) -> Box<dyn Iterator<Item = &'_ u64> + '_> {
312        Box::new(self.storage.as_ref().unwrap().row_ids())
313    }
314
315    async fn remap(&mut self, _mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
316        Err(Error::index(
317            "Remapping HNSW in this way not supported".to_string(),
318        ))
319    }
320
321    fn ivf_model(&self) -> &IvfModel {
322        unimplemented!("only for IVF")
323    }
324
325    fn quantizer(&self) -> Quantizer {
326        self.partition_storage.quantizer().clone()
327    }
328
329    fn partition_size(&self, _: usize) -> usize {
330        unimplemented!("only for IVF")
331    }
332
333    fn sub_index_type(&self) -> (SubIndexType, QuantizationType) {
334        (
335            SubIndexType::Hnsw,
336            self.partition_storage.quantizer().quantization_type(),
337        )
338    }
339
340    fn metric_type(&self) -> DistanceType {
341        self.partition_storage.distance_type()
342    }
343}