Skip to main content

lance_index/vector/ivf/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Build IVF model
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use arrow_array::cast::AsArray;
10use arrow_array::{Array, FixedSizeListArray, UInt32Array, UInt64Array};
11use futures::TryStreamExt;
12use object_store::path::Path;
13
14use lance_core::error::{Error, Result};
15use lance_io::stream::RecordBatchStream;
16
17/// Parameters to build IVF partitions
18#[derive(Debug, Clone)]
19pub struct IvfBuildParams {
20    /// Deprecated: use `target_partition_size` instead.
21    /// Number of partitions to build.
22    pub num_partitions: Option<usize>,
23
24    /// Target partition size.
25    /// If set, the number of partitions will be computed based on the target partition size.
26    /// Otherwise, the `target_partition_size` will be set by index type.
27    pub target_partition_size: Option<usize>,
28
29    // ---- kmeans parameters
30    /// Max number of iterations to train kmeans.
31    pub max_iters: usize,
32
33    /// Use provided IVF centroids.
34    pub centroids: Option<Arc<FixedSizeListArray>>,
35
36    /// Retrain centroids.
37    /// If true, the centroids will be retrained based on provided `centroids`.
38    pub retrain: bool,
39
40    pub sample_rate: usize,
41
42    /// Precomputed partitions file (row_id -> partition_id)
43    /// mutually exclusive with `precomputed_shuffle_buffers`
44    pub precomputed_partitions_file: Option<String>,
45
46    /// Precomputed shuffle buffers (row_id -> partition_id, pq_code)
47    /// mutually exclusive with `precomputed_partitions_file`
48    /// requires `centroids` to be set
49    ///
50    /// The input is expected to be (/dir/to/buffers, [buffer1.lance, buffer2.lance, ...])
51    pub precomputed_shuffle_buffers: Option<(Path, Vec<String>)>,
52
53    pub shuffle_partition_batches: usize,
54
55    pub shuffle_partition_concurrency: usize,
56
57    /// Storage options used to load precomputed partitions.
58    pub storage_options: Option<HashMap<String, String>>,
59}
60
61impl Default for IvfBuildParams {
62    fn default() -> Self {
63        Self {
64            num_partitions: None,
65            target_partition_size: None,
66            max_iters: 50,
67            centroids: None,
68            retrain: false,
69            sample_rate: 256, // See faiss
70            precomputed_partitions_file: None,
71            precomputed_shuffle_buffers: None,
72            shuffle_partition_batches: 1024 * 10,
73            shuffle_partition_concurrency: 2,
74            storage_options: None,
75        }
76    }
77}
78
79impl IvfBuildParams {
80    /// Create a new instance of `IvfBuildParams`.
81    pub fn new(num_partitions: usize) -> Self {
82        Self {
83            num_partitions: Some(num_partitions),
84            ..Default::default()
85        }
86    }
87
88    pub fn with_target_partition_size(target_partition_size: usize) -> Self {
89        Self {
90            target_partition_size: Some(target_partition_size),
91            ..Default::default()
92        }
93    }
94
95    /// Create a new instance of [`IvfBuildParams`] with centroids.
96    pub fn try_with_centroids(
97        num_partitions: usize,
98        centroids: Arc<FixedSizeListArray>,
99    ) -> Result<Self> {
100        if num_partitions != centroids.len() {
101            return Err(Error::index(format!(
102                "IvfBuildParams::try_with_centroids: num_partitions {} != centroids.len() {}",
103                num_partitions,
104                centroids.len()
105            )));
106        }
107        Ok(Self {
108            num_partitions: Some(num_partitions),
109            centroids: Some(centroids),
110            ..Default::default()
111        })
112    }
113}
114
115pub fn recommended_num_partitions(num_rows: usize, target_partition_size: usize) -> usize {
116    // The maximum number of partitions is 4096 to avoid slow KMeans clustering,
117    // bump it once we have better clustering algorithms.
118    const MAX_PARTITIONS: usize = 4096;
119    (num_rows / target_partition_size).clamp(1, MAX_PARTITIONS)
120}
121
122/// Load precomputed partitions from disk.
123///
124/// Currently, because `Dataset` is not cleanly refactored from `lance` to `lance-core`,
125/// we have to use `RecordBatchStream` as parameter.
126pub async fn load_precomputed_partitions(
127    stream: impl RecordBatchStream + Unpin + 'static,
128    size_hint: usize,
129) -> Result<HashMap<u64, u32>> {
130    let partition_lookup = stream
131        .try_fold(HashMap::with_capacity(size_hint), |mut lookup, batch| {
132            let row_ids: &UInt64Array = batch
133                .column_by_name("row_id")
134                .expect("malformed partition file: missing row_id column")
135                .as_primitive();
136            let partitions: &UInt32Array = batch
137                .column_by_name("partition")
138                .expect("malformed partition file: missing partition column")
139                .as_primitive();
140            row_ids
141                .values()
142                .iter()
143                .zip(partitions.values().iter())
144                .for_each(|(row_id, partition)| {
145                    lookup.insert(*row_id, *partition);
146                });
147            async move { Ok(lookup) }
148        })
149        .await?;
150
151    Ok(partition_lookup)
152}