iqdb-build 0.6.0

Parallel index construction, incremental updates, and merging - part of the iQDB family.
Documentation
//! Data-parallel sharded construction.
//!
//! A single [`Index`] cannot be built concurrently — every
//! [`insert`](iqdb_index::IndexCore::insert) takes `&mut self`. The bulk path is
//! instead to **split the input into shards and build one sub-index per shard in
//! parallel**, then combine them. This module adds the parallel-build half;
//! merging the shards into one index lands in a later phase (see
//! [`dev/ROADMAP.md`](../dev/ROADMAP.md)). Until then the shards are directly
//! usable by the engine, whose storage is itself sharded.

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use rayon::prelude::*;

use iqdb_index::Index;
use iqdb_types::Result;

use crate::builder::{BuildItem, IndexBuilder};

/// A progress snapshot delivered to the callback registered with
/// [`IndexBuilder::on_progress`] as parallel construction proceeds.
///
/// One snapshot is reported each time a shard finishes building, so
/// `shards_completed` climbs from `1` to `shards_total`. Because shards build
/// concurrently, snapshots may arrive on different threads and (rarely) slightly
/// out of order; treat `shards_completed` as a monotonic high-water count for a
/// progress bar, not as a strict sequence.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct BuildProgress {
    /// How many shards have finished building so far.
    pub shards_completed: usize,
    /// The total number of shards in this build.
    pub shards_total: usize,
}

impl<I: Index> IndexBuilder<I> {
    /// Set the shard count used by [`build_parallel`](IndexBuilder::build_parallel).
    ///
    /// The default is "auto" — one shard per available CPU
    /// ([`std::thread::available_parallelism`]). Set it explicitly to cap
    /// parallelism, to match the engine's shard layout, or for reproducible
    /// shard boundaries in tests. A count of `0` is treated as `1`, and the
    /// effective count never exceeds the number of items (no empty shards).
    ///
    /// This is a consuming, immutable setter: it returns a new builder.
    ///
    /// # Examples
    ///
    /// ```
    /// use iqdb_build::IndexBuilder;
    /// use iqdb_types::DistanceMetric;
    /// # use std::sync::Arc;
    /// # use iqdb_index::{Index, IndexCore, IndexStats};
    /// # use iqdb_types::{Hit, Metadata, Result, SearchParams, VectorId};
    /// # struct Flat { dim: usize, metric: DistanceMetric }
    /// # #[derive(Clone, Default)] struct FlatConfig;
    /// # impl IndexCore for Flat { fn insert(&mut self,_i:VectorId,_v:Arc<[f32]>,_m:Option<Metadata>)->Result<()>{Ok(())} fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{0} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
    /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric})} }
    /// let builder = IndexBuilder::<Flat>::new(8, DistanceMetric::Euclidean).with_shards(4);
    /// assert_eq!(builder.shards(), Some(4));
    /// ```
    #[inline]
    #[must_use]
    pub fn with_shards(mut self, shards: usize) -> Self {
        self.shards = Some(shards);
        self
    }

    /// The configured shard count, or `None` when it is left to "auto".
    ///
    /// # Examples
    ///
    /// ```
    /// # use iqdb_build::IndexBuilder;
    /// # use iqdb_types::DistanceMetric;
    /// # use std::sync::Arc;
    /// # use iqdb_index::{Index, IndexCore, IndexStats};
    /// # use iqdb_types::{Hit, Metadata, Result, SearchParams, VectorId};
    /// # struct Flat { dim: usize, metric: DistanceMetric }
    /// # #[derive(Clone, Default)] struct FlatConfig;
    /// # impl IndexCore for Flat { fn insert(&mut self,_i:VectorId,_v:Arc<[f32]>,_m:Option<Metadata>)->Result<()>{Ok(())} fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{0} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
    /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric})} }
    /// let builder = IndexBuilder::<Flat>::new(8, DistanceMetric::Euclidean);
    /// assert_eq!(builder.shards(), None);
    /// ```
    #[inline]
    pub fn shards(&self) -> Option<usize> {
        self.shards
    }

    /// Register a callback invoked each time a shard finishes building during
    /// [`build_parallel`](IndexBuilder::build_parallel) (and the build phase of
    /// [`build_merged`](IndexBuilder::build_merged)).
    ///
    /// The callback receives a [`BuildProgress`] snapshot. It must be
    /// `Send + Sync` because shards build concurrently, so it may be called from
    /// several worker threads; keep it cheap and non-blocking (update an atomic,
    /// nudge a progress bar). The sequential [`build`](IndexBuilder::build) does
    /// not report progress.
    ///
    /// This is a consuming, immutable setter: it returns a new builder.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::sync::Arc;
    /// use std::sync::atomic::{AtomicUsize, Ordering};
    /// use iqdb_build::IndexBuilder;
    /// use iqdb_types::{DistanceMetric, VectorId};
    /// # use iqdb_index::{Index, IndexCore, IndexStats};
    /// # use iqdb_types::{Hit, Metadata, Result, SearchParams};
    /// # struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
    /// # #[derive(Clone, Default)] struct FlatConfig;
    /// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
    /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric,rows:Vec::new()})} }
    /// # fn main() -> iqdb_types::Result<()> {
    /// let calls = Arc::new(AtomicUsize::new(0));
    /// let calls2 = Arc::clone(&calls);
    ///
    /// let items: Vec<_> = (0u64..100)
    ///     .map(|i| (VectorId::from(i), Arc::from([i as f32].as_slice()), None))
    ///     .collect();
    ///
    /// let _shards: Vec<Flat> = IndexBuilder::new(1, DistanceMetric::Euclidean)
    ///     .with_shards(4)
    ///     .on_progress(move |p| {
    ///         assert_eq!(p.shards_total, 4);
    ///         calls2.fetch_add(1, Ordering::Relaxed);
    ///     })
    ///     .build_parallel(items)?;
    ///
    /// // The callback fired once per shard.
    /// assert_eq!(calls.load(Ordering::Relaxed), 4);
    /// # Ok(()) }
    /// ```
    #[inline]
    #[must_use]
    pub fn on_progress<F>(mut self, callback: F) -> Self
    where
        F: Fn(BuildProgress) + Send + Sync + 'static,
    {
        self.progress = Some(Arc::new(callback));
        self
    }

    /// Build the input as several independent sub-indexes, one per shard, in
    /// parallel.
    ///
    /// The items are split into contiguous, near-equal shards and each shard is
    /// constructed on rayon's work-stealing pool: every shard runs
    /// [`Index::new`] with this builder's `dim`/`metric`/`config` and inserts its
    /// slice through [`insert_batch`](iqdb_index::IndexCore::insert_batch). The
    /// returned `Vec` preserves shard order (shard 0 holds the first slice of
    /// input, and so on).
    ///
    /// Wall-clock build time drops roughly linearly with cores, because the
    /// shards share nothing. Combining them into a single index is the job of a
    /// later merge phase; meanwhile the shards are directly usable by the engine,
    /// whose storage is sharded.
    ///
    /// The shard count comes from [`with_shards`](IndexBuilder::with_shards), or
    /// is one-per-CPU when left to auto. It is capped at the item count, so you
    /// never get an empty shard (except the single empty index produced from
    /// empty input).
    ///
    /// # Errors
    ///
    /// The same construction and insertion errors as [`build`](IndexBuilder::build),
    /// surfaced from whichever shard hits them first:
    /// [`InvalidConfig`](iqdb_types::IqdbError::InvalidConfig),
    /// [`DimensionMismatch`](iqdb_types::IqdbError::DimensionMismatch), and
    /// [`Duplicate`](iqdb_types::IqdbError::Duplicate).
    ///
    /// Note that duplicate detection is **per shard**: two items with the same id
    /// in *different* shards are not caught here (they would collide at merge
    /// time). Within a shard, a repeated id errors as usual.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::sync::Arc;
    /// use iqdb_build::IndexBuilder;
    /// use iqdb_types::{DistanceMetric, VectorId};
    /// # use iqdb_index::{Index, IndexCore, IndexStats};
    /// # use iqdb_types::{Hit, IqdbError, Metadata, Result, SearchParams};
    /// # struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
    /// # #[derive(Clone, Default)] struct FlatConfig;
    /// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { if v.len()!=self.dim { return Err(IqdbError::DimensionMismatch{expected:self.dim,found:v.len()});} self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
    /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{ if dim==0 {return Err(IqdbError::InvalidConfig{reason:"dim must be > 0"});} Ok(Flat{dim,metric,rows:Vec::new()}) } }
    /// # fn main() -> iqdb_types::Result<()> {
    /// let items: Vec<_> = (0u64..1_000)
    ///     .map(|i| (VectorId::from(i), Arc::from([i as f32, 0.0].as_slice()), None))
    ///     .collect();
    ///
    /// // Build four shards in parallel.
    /// let shards: Vec<Flat> = IndexBuilder::new(2, DistanceMetric::Euclidean)
    ///     .with_shards(4)
    ///     .build_parallel(items)?;
    ///
    /// assert_eq!(shards.len(), 4);
    /// let total: usize = shards.iter().map(|s| s.len()).sum();
    /// assert_eq!(total, 1_000);
    /// # Ok(()) }
    /// ```
    pub fn build_parallel<It>(&self, items: It) -> Result<Vec<I>>
    where
        It: IntoIterator<Item = BuildItem>,
        I::Config: Send + Sync,
    {
        let all: Vec<BuildItem> = items.into_iter().collect();
        let shard_count = self.resolve_shards(all.len());
        let chunks = split_items(all, shard_count);
        let total = chunks.len();
        let completed = AtomicUsize::new(0);

        chunks
            .into_par_iter()
            .map(|chunk| {
                let mut index = I::new(self.dim(), self.metric(), self.config().clone())?;
                index.insert_batch(chunk)?;
                if let Some(callback) = self.progress.as_deref() {
                    let done = completed.fetch_add(1, Ordering::Relaxed) + 1;
                    callback(BuildProgress {
                        shards_completed: done,
                        shards_total: total,
                    });
                }
                Ok(index)
            })
            .collect()
    }

    /// Resolve the effective shard count: requested-or-auto, clamped to
    /// `1..=len` (never zero, never more shards than items).
    fn resolve_shards(&self, len: usize) -> usize {
        let requested = match self.shards() {
            Some(n) => n.max(1),
            None => std::thread::available_parallelism()
                .map(|n| n.get())
                .unwrap_or(1),
        };
        requested.min(len.max(1))
    }
}

/// Split `all` into `k` contiguous, near-equal owned chunks. The first
/// `len % k` chunks get one extra item, so sizes differ by at most one.
fn split_items(all: Vec<BuildItem>, k: usize) -> Vec<Vec<BuildItem>> {
    let len = all.len();
    let base = len / k;
    let remainder = len % k;
    let mut iter = all.into_iter();
    let mut chunks = Vec::with_capacity(k);
    for i in 0..k {
        let take = base + usize::from(i < remainder);
        chunks.push(iter.by_ref().take(take).collect());
    }
    chunks
}