Skip to main content

iqdb_build/
parallel.rs

1//! Data-parallel sharded construction.
2//!
3//! A single [`Index`] cannot be built concurrently — every
4//! [`insert`](iqdb_index::IndexCore::insert) takes `&mut self`. The bulk path is
5//! instead to **split the input into shards and build one sub-index per shard in
6//! parallel**, then combine them. This module adds the parallel-build half;
7//! merging the shards into one index lands in a later phase (see
8//! [`dev/ROADMAP.md`](../dev/ROADMAP.md)). Until then the shards are directly
9//! usable by the engine, whose storage is itself sharded.
10
11use std::sync::Arc;
12use std::sync::atomic::{AtomicUsize, Ordering};
13
14use rayon::prelude::*;
15
16use iqdb_index::Index;
17use iqdb_types::Result;
18
19use crate::builder::{BuildItem, IndexBuilder};
20
21/// A progress snapshot delivered to the callback registered with
22/// [`IndexBuilder::on_progress`] as parallel construction proceeds.
23///
24/// One snapshot is reported each time a shard finishes building, so
25/// `shards_completed` climbs from `1` to `shards_total`. Because shards build
26/// concurrently, snapshots may arrive on different threads and (rarely) slightly
27/// out of order; treat `shards_completed` as a monotonic high-water count for a
28/// progress bar, not as a strict sequence.
29#[derive(Clone, Copy, Debug, PartialEq, Eq)]
30pub struct BuildProgress {
31    /// How many shards have finished building so far.
32    pub shards_completed: usize,
33    /// The total number of shards in this build.
34    pub shards_total: usize,
35}
36
37impl<I: Index> IndexBuilder<I> {
38    /// Set the shard count used by [`build_parallel`](IndexBuilder::build_parallel).
39    ///
40    /// The default is "auto" — one shard per available CPU
41    /// ([`std::thread::available_parallelism`]). Set it explicitly to cap
42    /// parallelism, to match the engine's shard layout, or for reproducible
43    /// shard boundaries in tests. A count of `0` is treated as `1`, and the
44    /// effective count never exceeds the number of items (no empty shards).
45    ///
46    /// This is a consuming, immutable setter: it returns a new builder.
47    ///
48    /// # Examples
49    ///
50    /// ```
51    /// use iqdb_build::IndexBuilder;
52    /// use iqdb_types::DistanceMetric;
53    /// # use std::sync::Arc;
54    /// # use iqdb_index::{Index, IndexCore, IndexStats};
55    /// # use iqdb_types::{Hit, Metadata, Result, SearchParams, VectorId};
56    /// # struct Flat { dim: usize, metric: DistanceMetric }
57    /// # #[derive(Clone, Default)] struct FlatConfig;
58    /// # impl IndexCore for Flat { fn insert(&mut self,_i:VectorId,_v:Arc<[f32]>,_m:Option<Metadata>)->Result<()>{Ok(())} fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{0} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
59    /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric})} }
60    /// let builder = IndexBuilder::<Flat>::new(8, DistanceMetric::Euclidean).with_shards(4);
61    /// assert_eq!(builder.shards(), Some(4));
62    /// ```
63    #[inline]
64    #[must_use]
65    pub fn with_shards(mut self, shards: usize) -> Self {
66        self.shards = Some(shards);
67        self
68    }
69
70    /// The configured shard count, or `None` when it is left to "auto".
71    ///
72    /// # Examples
73    ///
74    /// ```
75    /// # use iqdb_build::IndexBuilder;
76    /// # use iqdb_types::DistanceMetric;
77    /// # use std::sync::Arc;
78    /// # use iqdb_index::{Index, IndexCore, IndexStats};
79    /// # use iqdb_types::{Hit, Metadata, Result, SearchParams, VectorId};
80    /// # struct Flat { dim: usize, metric: DistanceMetric }
81    /// # #[derive(Clone, Default)] struct FlatConfig;
82    /// # impl IndexCore for Flat { fn insert(&mut self,_i:VectorId,_v:Arc<[f32]>,_m:Option<Metadata>)->Result<()>{Ok(())} fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{0} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
83    /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric})} }
84    /// let builder = IndexBuilder::<Flat>::new(8, DistanceMetric::Euclidean);
85    /// assert_eq!(builder.shards(), None);
86    /// ```
87    #[inline]
88    pub fn shards(&self) -> Option<usize> {
89        self.shards
90    }
91
92    /// Register a callback invoked each time a shard finishes building during
93    /// [`build_parallel`](IndexBuilder::build_parallel) (and the build phase of
94    /// [`build_merged`](IndexBuilder::build_merged)).
95    ///
96    /// The callback receives a [`BuildProgress`] snapshot. It must be
97    /// `Send + Sync` because shards build concurrently, so it may be called from
98    /// several worker threads; keep it cheap and non-blocking (update an atomic,
99    /// nudge a progress bar). The sequential [`build`](IndexBuilder::build) does
100    /// not report progress.
101    ///
102    /// This is a consuming, immutable setter: it returns a new builder.
103    ///
104    /// # Examples
105    ///
106    /// ```
107    /// use std::sync::Arc;
108    /// use std::sync::atomic::{AtomicUsize, Ordering};
109    /// use iqdb_build::IndexBuilder;
110    /// use iqdb_types::{DistanceMetric, VectorId};
111    /// # use iqdb_index::{Index, IndexCore, IndexStats};
112    /// # use iqdb_types::{Hit, Metadata, Result, SearchParams};
113    /// # struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
114    /// # #[derive(Clone, Default)] struct FlatConfig;
115    /// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
116    /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric,rows:Vec::new()})} }
117    /// # fn main() -> iqdb_types::Result<()> {
118    /// let calls = Arc::new(AtomicUsize::new(0));
119    /// let calls2 = Arc::clone(&calls);
120    ///
121    /// let items: Vec<_> = (0u64..100)
122    ///     .map(|i| (VectorId::from(i), Arc::from([i as f32].as_slice()), None))
123    ///     .collect();
124    ///
125    /// let _shards: Vec<Flat> = IndexBuilder::new(1, DistanceMetric::Euclidean)
126    ///     .with_shards(4)
127    ///     .on_progress(move |p| {
128    ///         assert_eq!(p.shards_total, 4);
129    ///         calls2.fetch_add(1, Ordering::Relaxed);
130    ///     })
131    ///     .build_parallel(items)?;
132    ///
133    /// // The callback fired once per shard.
134    /// assert_eq!(calls.load(Ordering::Relaxed), 4);
135    /// # Ok(()) }
136    /// ```
137    #[inline]
138    #[must_use]
139    pub fn on_progress<F>(mut self, callback: F) -> Self
140    where
141        F: Fn(BuildProgress) + Send + Sync + 'static,
142    {
143        self.progress = Some(Arc::new(callback));
144        self
145    }
146
147    /// Build the input as several independent sub-indexes, one per shard, in
148    /// parallel.
149    ///
150    /// The items are split into contiguous, near-equal shards and each shard is
151    /// constructed on rayon's work-stealing pool: every shard runs
152    /// [`Index::new`] with this builder's `dim`/`metric`/`config` and inserts its
153    /// slice through [`insert_batch`](iqdb_index::IndexCore::insert_batch). The
154    /// returned `Vec` preserves shard order (shard 0 holds the first slice of
155    /// input, and so on).
156    ///
157    /// Wall-clock build time drops roughly linearly with cores, because the
158    /// shards share nothing. Combining them into a single index is the job of a
159    /// later merge phase; meanwhile the shards are directly usable by the engine,
160    /// whose storage is sharded.
161    ///
162    /// The shard count comes from [`with_shards`](IndexBuilder::with_shards), or
163    /// is one-per-CPU when left to auto. It is capped at the item count, so you
164    /// never get an empty shard (except the single empty index produced from
165    /// empty input).
166    ///
167    /// # Errors
168    ///
169    /// The same construction and insertion errors as [`build`](IndexBuilder::build),
170    /// surfaced from whichever shard hits them first:
171    /// [`InvalidConfig`](iqdb_types::IqdbError::InvalidConfig),
172    /// [`DimensionMismatch`](iqdb_types::IqdbError::DimensionMismatch), and
173    /// [`Duplicate`](iqdb_types::IqdbError::Duplicate).
174    ///
175    /// Note that duplicate detection is **per shard**: two items with the same id
176    /// in *different* shards are not caught here (they would collide at merge
177    /// time). Within a shard, a repeated id errors as usual.
178    ///
179    /// # Examples
180    ///
181    /// ```
182    /// use std::sync::Arc;
183    /// use iqdb_build::IndexBuilder;
184    /// use iqdb_types::{DistanceMetric, VectorId};
185    /// # use iqdb_index::{Index, IndexCore, IndexStats};
186    /// # use iqdb_types::{Hit, IqdbError, Metadata, Result, SearchParams};
187    /// # struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
188    /// # #[derive(Clone, Default)] struct FlatConfig;
189    /// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { if v.len()!=self.dim { return Err(IqdbError::DimensionMismatch{expected:self.dim,found:v.len()});} self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
190    /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{ if dim==0 {return Err(IqdbError::InvalidConfig{reason:"dim must be > 0"});} Ok(Flat{dim,metric,rows:Vec::new()}) } }
191    /// # fn main() -> iqdb_types::Result<()> {
192    /// let items: Vec<_> = (0u64..1_000)
193    ///     .map(|i| (VectorId::from(i), Arc::from([i as f32, 0.0].as_slice()), None))
194    ///     .collect();
195    ///
196    /// // Build four shards in parallel.
197    /// let shards: Vec<Flat> = IndexBuilder::new(2, DistanceMetric::Euclidean)
198    ///     .with_shards(4)
199    ///     .build_parallel(items)?;
200    ///
201    /// assert_eq!(shards.len(), 4);
202    /// let total: usize = shards.iter().map(|s| s.len()).sum();
203    /// assert_eq!(total, 1_000);
204    /// # Ok(()) }
205    /// ```
206    pub fn build_parallel<It>(&self, items: It) -> Result<Vec<I>>
207    where
208        It: IntoIterator<Item = BuildItem>,
209        I::Config: Send + Sync,
210    {
211        let all: Vec<BuildItem> = items.into_iter().collect();
212        let shard_count = self.resolve_shards(all.len());
213        let chunks = split_items(all, shard_count);
214        let total = chunks.len();
215        let completed = AtomicUsize::new(0);
216
217        chunks
218            .into_par_iter()
219            .map(|chunk| {
220                let mut index = I::new(self.dim(), self.metric(), self.config().clone())?;
221                index.insert_batch(chunk)?;
222                if let Some(callback) = self.progress.as_deref() {
223                    let done = completed.fetch_add(1, Ordering::Relaxed) + 1;
224                    callback(BuildProgress {
225                        shards_completed: done,
226                        shards_total: total,
227                    });
228                }
229                Ok(index)
230            })
231            .collect()
232    }
233
234    /// Resolve the effective shard count: requested-or-auto, clamped to
235    /// `1..=len` (never zero, never more shards than items).
236    fn resolve_shards(&self, len: usize) -> usize {
237        let requested = match self.shards() {
238            Some(n) => n.max(1),
239            None => std::thread::available_parallelism()
240                .map(|n| n.get())
241                .unwrap_or(1),
242        };
243        requested.min(len.max(1))
244    }
245}
246
247/// Split `all` into `k` contiguous, near-equal owned chunks. The first
248/// `len % k` chunks get one extra item, so sizes differ by at most one.
249fn split_items(all: Vec<BuildItem>, k: usize) -> Vec<Vec<BuildItem>> {
250    let len = all.len();
251    let base = len / k;
252    let remainder = len % k;
253    let mut iter = all.into_iter();
254    let mut chunks = Vec::with_capacity(k);
255    for i in 0..k {
256        let take = base + usize::from(i < remainder);
257        chunks.push(iter.by_ref().take(take).collect());
258    }
259    chunks
260}