lance_index/vector/ivf/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Build IVF model
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use arrow_array::cast::AsArray;
10use arrow_array::{Array, FixedSizeListArray, UInt32Array, UInt64Array};
11use futures::TryStreamExt;
12use object_store::path::Path;
13use snafu::location;
14
15use lance_core::error::{Error, Result};
16use lance_io::stream::RecordBatchStream;
17
18/// Parameters to build IVF partitions
19#[derive(Debug, Clone)]
20pub struct IvfBuildParams {
21    /// Deprecated: use `target_partition_size` instead.
22    /// Number of partitions to build.
23    pub num_partitions: Option<usize>,
24
25    /// Target partition size.
26    /// If set, the number of partitions will be computed based on the target partition size.
27    /// Otherwise, the `target_partition_size` will be set by index type.
28    pub target_partition_size: Option<usize>,
29
30    // ---- kmeans parameters
31    /// Max number of iterations to train kmeans.
32    pub max_iters: usize,
33
34    /// Use provided IVF centroids.
35    pub centroids: Option<Arc<FixedSizeListArray>>,
36
37    /// Retrain centroids.
38    /// If true, the centroids will be retrained based on provided `centroids`.
39    pub retrain: bool,
40
41    pub sample_rate: usize,
42
43    /// Precomputed partitions file (row_id -> partition_id)
44    /// mutually exclusive with `precomputed_shuffle_buffers`
45    pub precomputed_partitions_file: Option<String>,
46
47    /// Precomputed shuffle buffers (row_id -> partition_id, pq_code)
48    /// mutually exclusive with `precomputed_partitions_file`
49    /// requires `centroids` to be set
50    ///
51    /// The input is expected to be (/dir/to/buffers, [buffer1.lance, buffer2.lance, ...])
52    pub precomputed_shuffle_buffers: Option<(Path, Vec<String>)>,
53
54    pub shuffle_partition_batches: usize,
55
56    pub shuffle_partition_concurrency: usize,
57
58    /// Storage options used to load precomputed partitions.
59    pub storage_options: Option<HashMap<String, String>>,
60}
61
62impl Default for IvfBuildParams {
63    fn default() -> Self {
64        Self {
65            num_partitions: None,
66            target_partition_size: None,
67            max_iters: 50,
68            centroids: None,
69            retrain: false,
70            sample_rate: 256, // See faiss
71            precomputed_partitions_file: None,
72            precomputed_shuffle_buffers: None,
73            shuffle_partition_batches: 1024 * 10,
74            shuffle_partition_concurrency: 2,
75            storage_options: None,
76        }
77    }
78}
79
80impl IvfBuildParams {
81    /// Create a new instance of `IvfBuildParams`.
82    pub fn new(num_partitions: usize) -> Self {
83        Self {
84            num_partitions: Some(num_partitions),
85            ..Default::default()
86        }
87    }
88
89    pub fn with_target_partition_size(target_partition_size: usize) -> Self {
90        Self {
91            target_partition_size: Some(target_partition_size),
92            ..Default::default()
93        }
94    }
95
96    /// Create a new instance of [`IvfBuildParams`] with centroids.
97    pub fn try_with_centroids(
98        num_partitions: usize,
99        centroids: Arc<FixedSizeListArray>,
100    ) -> Result<Self> {
101        if num_partitions != centroids.len() {
102            return Err(Error::Index {
103                message: format!(
104                    "IvfBuildParams::try_with_centroids: num_partitions {} != centroids.len() {}",
105                    num_partitions,
106                    centroids.len()
107                ),
108                location: location!(),
109            });
110        }
111        Ok(Self {
112            num_partitions: Some(num_partitions),
113            centroids: Some(centroids),
114            ..Default::default()
115        })
116    }
117}
118
119pub fn recommended_num_partitions(num_rows: usize, target_partition_size: usize) -> usize {
120    // The maximum number of partitions is 4096 to avoid slow KMeans clustering,
121    // bump it once we have better clustering algorithms.
122    const MAX_PARTITIONS: usize = 4096;
123    (num_rows / target_partition_size).clamp(1, MAX_PARTITIONS)
124}
125
126/// Load precomputed partitions from disk.
127///
128/// Currently, because `Dataset` is not cleanly refactored from `lance` to `lance-core`,
129/// we have to use `RecordBatchStream` as parameter.
130pub async fn load_precomputed_partitions(
131    stream: impl RecordBatchStream + Unpin + 'static,
132    size_hint: usize,
133) -> Result<HashMap<u64, u32>> {
134    let partition_lookup = stream
135        .try_fold(HashMap::with_capacity(size_hint), |mut lookup, batch| {
136            let row_ids: &UInt64Array = batch
137                .column_by_name("row_id")
138                .expect("malformed partition file: missing row_id column")
139                .as_primitive();
140            let partitions: &UInt32Array = batch
141                .column_by_name("partition")
142                .expect("malformed partition file: missing partition column")
143                .as_primitive();
144            row_ids
145                .values()
146                .iter()
147                .zip(partitions.values().iter())
148                .for_each(|(row_id, partition)| {
149                    lookup.insert(*row_id, *partition);
150                });
151            async move { Ok(lookup) }
152        })
153        .await?;
154
155    Ok(partition_lookup)
156}