lance_index/vector/ivf/
builder.rs1use std::collections::HashMap;
7use std::sync::Arc;
8
9use arrow_array::cast::AsArray;
10use arrow_array::{Array, FixedSizeListArray, UInt32Array, UInt64Array};
11use futures::TryStreamExt;
12use object_store::path::Path;
13use snafu::location;
14
15use lance_core::error::{Error, Result};
16use lance_io::stream::RecordBatchStream;
17
18#[derive(Debug, Clone)]
20pub struct IvfBuildParams {
21 pub num_partitions: Option<usize>,
24
25 pub target_partition_size: Option<usize>,
29
30 pub max_iters: usize,
33
34 pub centroids: Option<Arc<FixedSizeListArray>>,
36
37 pub retrain: bool,
40
41 pub sample_rate: usize,
42
43 pub precomputed_partitions_file: Option<String>,
46
47 pub precomputed_shuffle_buffers: Option<(Path, Vec<String>)>,
53
54 pub shuffle_partition_batches: usize,
55
56 pub shuffle_partition_concurrency: usize,
57
58 pub storage_options: Option<HashMap<String, String>>,
60}
61
62impl Default for IvfBuildParams {
63 fn default() -> Self {
64 Self {
65 num_partitions: None,
66 target_partition_size: None,
67 max_iters: 50,
68 centroids: None,
69 retrain: false,
70 sample_rate: 256, precomputed_partitions_file: None,
72 precomputed_shuffle_buffers: None,
73 shuffle_partition_batches: 1024 * 10,
74 shuffle_partition_concurrency: 2,
75 storage_options: None,
76 }
77 }
78}
79
80impl IvfBuildParams {
81 pub fn new(num_partitions: usize) -> Self {
83 Self {
84 num_partitions: Some(num_partitions),
85 ..Default::default()
86 }
87 }
88
89 pub fn with_target_partition_size(target_partition_size: usize) -> Self {
90 Self {
91 target_partition_size: Some(target_partition_size),
92 ..Default::default()
93 }
94 }
95
96 pub fn try_with_centroids(
98 num_partitions: usize,
99 centroids: Arc<FixedSizeListArray>,
100 ) -> Result<Self> {
101 if num_partitions != centroids.len() {
102 return Err(Error::Index {
103 message: format!(
104 "IvfBuildParams::try_with_centroids: num_partitions {} != centroids.len() {}",
105 num_partitions,
106 centroids.len()
107 ),
108 location: location!(),
109 });
110 }
111 Ok(Self {
112 num_partitions: Some(num_partitions),
113 centroids: Some(centroids),
114 ..Default::default()
115 })
116 }
117}
118
119pub fn recommended_num_partitions(num_rows: usize, target_partition_size: usize) -> usize {
120 const MAX_PARTITIONS: usize = 4096;
123 (num_rows / target_partition_size).clamp(1, MAX_PARTITIONS)
124}
125
126pub async fn load_precomputed_partitions(
131 stream: impl RecordBatchStream + Unpin + 'static,
132 size_hint: usize,
133) -> Result<HashMap<u64, u32>> {
134 let partition_lookup = stream
135 .try_fold(HashMap::with_capacity(size_hint), |mut lookup, batch| {
136 let row_ids: &UInt64Array = batch
137 .column_by_name("row_id")
138 .expect("malformed partition file: missing row_id column")
139 .as_primitive();
140 let partitions: &UInt32Array = batch
141 .column_by_name("partition")
142 .expect("malformed partition file: missing partition column")
143 .as_primitive();
144 row_ids
145 .values()
146 .iter()
147 .zip(partitions.values().iter())
148 .for_each(|(row_id, partition)| {
149 lookup.insert(*row_id, *partition);
150 });
151 async move { Ok(lookup) }
152 })
153 .await?;
154
155 Ok(partition_lookup)
156}