lance_index/vector/ivf/
builder.rs1use std::collections::HashMap;
7use std::sync::Arc;
8
9use arrow_array::cast::AsArray;
10use arrow_array::{Array, FixedSizeListArray, UInt32Array, UInt64Array};
11use futures::TryStreamExt;
12use object_store::path::Path;
13
14use lance_core::error::{Error, Result};
15use lance_io::stream::RecordBatchStream;
16
17#[derive(Debug, Clone)]
19pub struct IvfBuildParams {
20 pub num_partitions: Option<usize>,
23
24 pub target_partition_size: Option<usize>,
28
29 pub max_iters: usize,
32
33 pub centroids: Option<Arc<FixedSizeListArray>>,
35
36 pub retrain: bool,
39
40 pub sample_rate: usize,
41
42 pub precomputed_partitions_file: Option<String>,
45
46 pub precomputed_shuffle_buffers: Option<(Path, Vec<String>)>,
52
53 pub shuffle_partition_batches: usize,
54
55 pub shuffle_partition_concurrency: usize,
56
57 pub storage_options: Option<HashMap<String, String>>,
59}
60
61impl Default for IvfBuildParams {
62 fn default() -> Self {
63 Self {
64 num_partitions: None,
65 target_partition_size: None,
66 max_iters: 50,
67 centroids: None,
68 retrain: false,
69 sample_rate: 256, precomputed_partitions_file: None,
71 precomputed_shuffle_buffers: None,
72 shuffle_partition_batches: 1024 * 10,
73 shuffle_partition_concurrency: 2,
74 storage_options: None,
75 }
76 }
77}
78
79impl IvfBuildParams {
80 pub fn new(num_partitions: usize) -> Self {
82 Self {
83 num_partitions: Some(num_partitions),
84 ..Default::default()
85 }
86 }
87
88 pub fn with_target_partition_size(target_partition_size: usize) -> Self {
89 Self {
90 target_partition_size: Some(target_partition_size),
91 ..Default::default()
92 }
93 }
94
95 pub fn try_with_centroids(
97 num_partitions: usize,
98 centroids: Arc<FixedSizeListArray>,
99 ) -> Result<Self> {
100 if num_partitions != centroids.len() {
101 return Err(Error::index(format!(
102 "IvfBuildParams::try_with_centroids: num_partitions {} != centroids.len() {}",
103 num_partitions,
104 centroids.len()
105 )));
106 }
107 Ok(Self {
108 num_partitions: Some(num_partitions),
109 centroids: Some(centroids),
110 ..Default::default()
111 })
112 }
113}
114
115pub fn recommended_num_partitions(num_rows: usize, target_partition_size: usize) -> usize {
116 const MAX_PARTITIONS: usize = 4096;
119 (num_rows / target_partition_size).clamp(1, MAX_PARTITIONS)
120}
121
122pub async fn load_precomputed_partitions(
127 stream: impl RecordBatchStream + Unpin + 'static,
128 size_hint: usize,
129) -> Result<HashMap<u64, u32>> {
130 let partition_lookup = stream
131 .try_fold(HashMap::with_capacity(size_hint), |mut lookup, batch| {
132 let row_ids: &UInt64Array = batch
133 .column_by_name("row_id")
134 .expect("malformed partition file: missing row_id column")
135 .as_primitive();
136 let partitions: &UInt32Array = batch
137 .column_by_name("partition")
138 .expect("malformed partition file: missing partition column")
139 .as_primitive();
140 row_ids
141 .values()
142 .iter()
143 .zip(partitions.values().iter())
144 .for_each(|(row_id, partition)| {
145 lookup.insert(*row_id, *partition);
146 });
147 async move { Ok(lookup) }
148 })
149 .await?;
150
151 Ok(partition_lookup)
152}