iqdb_build/parallel.rs
1//! Data-parallel sharded construction.
2//!
3//! A single [`Index`] cannot be built concurrently — every
4//! [`insert`](iqdb_index::IndexCore::insert) takes `&mut self`. The bulk path is
5//! instead to **split the input into shards and build one sub-index per shard in
6//! parallel**, then combine them. This module adds the parallel-build half;
7//! merging the shards into one index lands in a later phase (see
8//! [`dev/ROADMAP.md`](../dev/ROADMAP.md)). Until then the shards are directly
9//! usable by the engine, whose storage is itself sharded.
10
11use std::sync::Arc;
12use std::sync::atomic::{AtomicUsize, Ordering};
13
14use rayon::prelude::*;
15
16use iqdb_index::Index;
17use iqdb_types::Result;
18
19use crate::builder::{BuildItem, IndexBuilder};
20
21/// A progress snapshot delivered to the callback registered with
22/// [`IndexBuilder::on_progress`] as parallel construction proceeds.
23///
24/// One snapshot is reported each time a shard finishes building, so
25/// `shards_completed` climbs from `1` to `shards_total`. Because shards build
26/// concurrently, snapshots may arrive on different threads and (rarely) slightly
27/// out of order; treat `shards_completed` as a monotonic high-water count for a
28/// progress bar, not as a strict sequence.
29#[derive(Clone, Copy, Debug, PartialEq, Eq)]
30pub struct BuildProgress {
31 /// How many shards have finished building so far.
32 pub shards_completed: usize,
33 /// The total number of shards in this build.
34 pub shards_total: usize,
35}
36
37impl<I: Index> IndexBuilder<I> {
38 /// Set the shard count used by [`build_parallel`](IndexBuilder::build_parallel).
39 ///
40 /// The default is "auto" — one shard per available CPU
41 /// ([`std::thread::available_parallelism`]). Set it explicitly to cap
42 /// parallelism, to match the engine's shard layout, or for reproducible
43 /// shard boundaries in tests. A count of `0` is treated as `1`, and the
44 /// effective count never exceeds the number of items (no empty shards).
45 ///
46 /// This is a consuming, immutable setter: it returns a new builder.
47 ///
48 /// # Examples
49 ///
50 /// ```
51 /// use iqdb_build::IndexBuilder;
52 /// use iqdb_types::DistanceMetric;
53 /// # use std::sync::Arc;
54 /// # use iqdb_index::{Index, IndexCore, IndexStats};
55 /// # use iqdb_types::{Hit, Metadata, Result, SearchParams, VectorId};
56 /// # struct Flat { dim: usize, metric: DistanceMetric }
57 /// # #[derive(Clone, Default)] struct FlatConfig;
58 /// # impl IndexCore for Flat { fn insert(&mut self,_i:VectorId,_v:Arc<[f32]>,_m:Option<Metadata>)->Result<()>{Ok(())} fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{0} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
59 /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric})} }
60 /// let builder = IndexBuilder::<Flat>::new(8, DistanceMetric::Euclidean).with_shards(4);
61 /// assert_eq!(builder.shards(), Some(4));
62 /// ```
63 #[inline]
64 #[must_use]
65 pub fn with_shards(mut self, shards: usize) -> Self {
66 self.shards = Some(shards);
67 self
68 }
69
70 /// The configured shard count, or `None` when it is left to "auto".
71 ///
72 /// # Examples
73 ///
74 /// ```
75 /// # use iqdb_build::IndexBuilder;
76 /// # use iqdb_types::DistanceMetric;
77 /// # use std::sync::Arc;
78 /// # use iqdb_index::{Index, IndexCore, IndexStats};
79 /// # use iqdb_types::{Hit, Metadata, Result, SearchParams, VectorId};
80 /// # struct Flat { dim: usize, metric: DistanceMetric }
81 /// # #[derive(Clone, Default)] struct FlatConfig;
82 /// # impl IndexCore for Flat { fn insert(&mut self,_i:VectorId,_v:Arc<[f32]>,_m:Option<Metadata>)->Result<()>{Ok(())} fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{0} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
83 /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric})} }
84 /// let builder = IndexBuilder::<Flat>::new(8, DistanceMetric::Euclidean);
85 /// assert_eq!(builder.shards(), None);
86 /// ```
87 #[inline]
88 pub fn shards(&self) -> Option<usize> {
89 self.shards
90 }
91
92 /// Register a callback invoked each time a shard finishes building during
93 /// [`build_parallel`](IndexBuilder::build_parallel) (and the build phase of
94 /// [`build_merged`](IndexBuilder::build_merged)).
95 ///
96 /// The callback receives a [`BuildProgress`] snapshot. It must be
97 /// `Send + Sync` because shards build concurrently, so it may be called from
98 /// several worker threads; keep it cheap and non-blocking (update an atomic,
99 /// nudge a progress bar). The sequential [`build`](IndexBuilder::build) does
100 /// not report progress.
101 ///
102 /// This is a consuming, immutable setter: it returns a new builder.
103 ///
104 /// # Examples
105 ///
106 /// ```
107 /// use std::sync::Arc;
108 /// use std::sync::atomic::{AtomicUsize, Ordering};
109 /// use iqdb_build::IndexBuilder;
110 /// use iqdb_types::{DistanceMetric, VectorId};
111 /// # use iqdb_index::{Index, IndexCore, IndexStats};
112 /// # use iqdb_types::{Hit, Metadata, Result, SearchParams};
113 /// # struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
114 /// # #[derive(Clone, Default)] struct FlatConfig;
115 /// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
116 /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric,rows:Vec::new()})} }
117 /// # fn main() -> iqdb_types::Result<()> {
118 /// let calls = Arc::new(AtomicUsize::new(0));
119 /// let calls2 = Arc::clone(&calls);
120 ///
121 /// let items: Vec<_> = (0u64..100)
122 /// .map(|i| (VectorId::from(i), Arc::from([i as f32].as_slice()), None))
123 /// .collect();
124 ///
125 /// let _shards: Vec<Flat> = IndexBuilder::new(1, DistanceMetric::Euclidean)
126 /// .with_shards(4)
127 /// .on_progress(move |p| {
128 /// assert_eq!(p.shards_total, 4);
129 /// calls2.fetch_add(1, Ordering::Relaxed);
130 /// })
131 /// .build_parallel(items)?;
132 ///
133 /// // The callback fired once per shard.
134 /// assert_eq!(calls.load(Ordering::Relaxed), 4);
135 /// # Ok(()) }
136 /// ```
137 #[inline]
138 #[must_use]
139 pub fn on_progress<F>(mut self, callback: F) -> Self
140 where
141 F: Fn(BuildProgress) + Send + Sync + 'static,
142 {
143 self.progress = Some(Arc::new(callback));
144 self
145 }
146
147 /// Build the input as several independent sub-indexes, one per shard, in
148 /// parallel.
149 ///
150 /// The items are split into contiguous, near-equal shards and each shard is
151 /// constructed on rayon's work-stealing pool: every shard runs
152 /// [`Index::new`] with this builder's `dim`/`metric`/`config` and inserts its
153 /// slice through [`insert_batch`](iqdb_index::IndexCore::insert_batch). The
154 /// returned `Vec` preserves shard order (shard 0 holds the first slice of
155 /// input, and so on).
156 ///
157 /// Wall-clock build time drops roughly linearly with cores, because the
158 /// shards share nothing. Combining them into a single index is the job of a
159 /// later merge phase; meanwhile the shards are directly usable by the engine,
160 /// whose storage is sharded.
161 ///
162 /// The shard count comes from [`with_shards`](IndexBuilder::with_shards), or
163 /// is one-per-CPU when left to auto. It is capped at the item count, so you
164 /// never get an empty shard (except the single empty index produced from
165 /// empty input).
166 ///
167 /// # Errors
168 ///
169 /// The same construction and insertion errors as [`build`](IndexBuilder::build),
170 /// surfaced from whichever shard hits them first:
171 /// [`InvalidConfig`](iqdb_types::IqdbError::InvalidConfig),
172 /// [`DimensionMismatch`](iqdb_types::IqdbError::DimensionMismatch), and
173 /// [`Duplicate`](iqdb_types::IqdbError::Duplicate).
174 ///
175 /// Note that duplicate detection is **per shard**: two items with the same id
176 /// in *different* shards are not caught here (they would collide at merge
177 /// time). Within a shard, a repeated id errors as usual.
178 ///
179 /// # Examples
180 ///
181 /// ```
182 /// use std::sync::Arc;
183 /// use iqdb_build::IndexBuilder;
184 /// use iqdb_types::{DistanceMetric, VectorId};
185 /// # use iqdb_index::{Index, IndexCore, IndexStats};
186 /// # use iqdb_types::{Hit, IqdbError, Metadata, Result, SearchParams};
187 /// # struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
188 /// # #[derive(Clone, Default)] struct FlatConfig;
189 /// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { if v.len()!=self.dim { return Err(IqdbError::DimensionMismatch{expected:self.dim,found:v.len()});} self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
190 /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{ if dim==0 {return Err(IqdbError::InvalidConfig{reason:"dim must be > 0"});} Ok(Flat{dim,metric,rows:Vec::new()}) } }
191 /// # fn main() -> iqdb_types::Result<()> {
192 /// let items: Vec<_> = (0u64..1_000)
193 /// .map(|i| (VectorId::from(i), Arc::from([i as f32, 0.0].as_slice()), None))
194 /// .collect();
195 ///
196 /// // Build four shards in parallel.
197 /// let shards: Vec<Flat> = IndexBuilder::new(2, DistanceMetric::Euclidean)
198 /// .with_shards(4)
199 /// .build_parallel(items)?;
200 ///
201 /// assert_eq!(shards.len(), 4);
202 /// let total: usize = shards.iter().map(|s| s.len()).sum();
203 /// assert_eq!(total, 1_000);
204 /// # Ok(()) }
205 /// ```
206 pub fn build_parallel<It>(&self, items: It) -> Result<Vec<I>>
207 where
208 It: IntoIterator<Item = BuildItem>,
209 I::Config: Send + Sync,
210 {
211 let all: Vec<BuildItem> = items.into_iter().collect();
212 let shard_count = self.resolve_shards(all.len());
213 let chunks = split_items(all, shard_count);
214 let total = chunks.len();
215 let completed = AtomicUsize::new(0);
216
217 chunks
218 .into_par_iter()
219 .map(|chunk| {
220 let mut index = I::new(self.dim(), self.metric(), self.config().clone())?;
221 index.insert_batch(chunk)?;
222 if let Some(callback) = self.progress.as_deref() {
223 let done = completed.fetch_add(1, Ordering::Relaxed) + 1;
224 callback(BuildProgress {
225 shards_completed: done,
226 shards_total: total,
227 });
228 }
229 Ok(index)
230 })
231 .collect()
232 }
233
234 /// Resolve the effective shard count: requested-or-auto, clamped to
235 /// `1..=len` (never zero, never more shards than items).
236 fn resolve_shards(&self, len: usize) -> usize {
237 let requested = match self.shards() {
238 Some(n) => n.max(1),
239 None => std::thread::available_parallelism()
240 .map(|n| n.get())
241 .unwrap_or(1),
242 };
243 requested.min(len.max(1))
244 }
245}
246
247/// Split `all` into `k` contiguous, near-equal owned chunks. The first
248/// `len % k` chunks get one extra item, so sizes differ by at most one.
249fn split_items(all: Vec<BuildItem>, k: usize) -> Vec<Vec<BuildItem>> {
250 let len = all.len();
251 let base = len / k;
252 let remainder = len % k;
253 let mut iter = all.into_iter();
254 let mut chunks = Vec::with_capacity(k);
255 for i in 0..k {
256 let take = base + usize::from(i < remainder);
257 chunks.push(iter.by_ref().take(take).collect());
258 }
259 chunks
260}