Skip to main content

iqdb_build/
merge.rs

1//! Combining independently built sub-indexes into one.
2//!
3//! [`build_parallel`](IndexBuilder::build_parallel) produces a `Vec` of shards;
4//! merging folds them back into a single index. The *mechanism* differs per
5//! backend — a flat index appends rows, an IVF index extends its posting lists,
6//! a graph index re-runs its boundary heuristics — so it cannot be expressed
7//! through the read-only [`IndexCore`](iqdb_index::IndexCore) surface. The
8//! [`Mergeable`] trait is the seam: a backend opts in by saying how to absorb
9//! another instance of itself, and [`merge`] / [`IndexBuilder::build_merged`]
10//! orchestrate the fold.
11
12use iqdb_index::Index;
13use iqdb_types::Result;
14
15use crate::builder::{BuildItem, IndexBuilder};
16
17/// A backend that can absorb another instance of itself.
18///
19/// Implement this for an [`Index`] so [`merge`] and
20/// [`IndexBuilder::build_merged`] can combine sharded builds into one index. The
21/// trait deliberately takes `other` **by value**: merging consumes the source,
22/// letting an implementation move its storage rather than copy it.
23///
24/// # Contract
25///
26/// - **Same shape.** `other` must share `self`'s `dim` and `metric`. An
27///   implementation should return
28///   [`InvalidConfig`](iqdb_types::IqdbError::InvalidConfig) on a mismatch rather
29///   than silently producing a corrupt index.
30/// - **Set union.** After `self.merge(other)?`, every id searchable in either
31///   input is searchable in `self`. This is where cross-shard id collisions
32///   surface: if both inputs hold the same id, an implementation returns
33///   [`Duplicate`](iqdb_types::IqdbError::Duplicate).
34/// - **Failure is observable, not silent.** On `Err`, `self` may be left
35///   partially merged; the operation is not transactional. Callers that need the
36///   original back should merge into a clone.
37///
38/// # Examples
39///
40/// A flat index merges by appending the other's rows:
41///
42/// ```
43/// use std::sync::Arc;
44/// use iqdb_build::{Mergeable, merge};
45/// use iqdb_types::{DistanceMetric, VectorId};
46/// # use iqdb_index::{Index, IndexCore, IndexStats};
47/// # use iqdb_types::{Hit, IqdbError, Metadata, Result, SearchParams};
48/// # #[derive(Clone)] struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
49/// # #[derive(Clone, Default)] struct FlatConfig;
50/// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { if self.rows.iter().any(|(e,_)|e==&id){return Err(IqdbError::Duplicate);} self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
51/// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric,rows:Vec::new()})} }
52/// impl Mergeable for Flat {
53///     fn merge(&mut self, other: Self) -> Result<()> {
54///         if other.dim() != self.dim() || other.metric() != self.metric() {
55///             return Err(IqdbError::InvalidConfig { reason: "merge shape mismatch" });
56///         }
57///         for (id, vector) in other.rows {
58///             self.insert(id, vector, None)?; // re-checks duplicates
59///         }
60///         Ok(())
61///     }
62/// }
63/// # fn main() -> iqdb_types::Result<()> {
64/// let mut a = Flat::new(1, DistanceMetric::Euclidean, FlatConfig::default())?;
65/// a.insert(VectorId::from(1u64), Arc::from([0.0_f32].as_slice()), None)?;
66/// let mut b = Flat::new(1, DistanceMetric::Euclidean, FlatConfig::default())?;
67/// b.insert(VectorId::from(2u64), Arc::from([1.0_f32].as_slice()), None)?;
68///
69/// a.merge(b)?;
70/// assert_eq!(a.len(), 2);
71/// # Ok(()) }
72/// ```
73pub trait Mergeable: Index {
74    /// Absorb `other` into `self`, leaving `self` searchable over the union of
75    /// both. See the [trait contract](Mergeable#contract) for the guarantees and
76    /// failure modes.
77    fn merge(&mut self, other: Self) -> Result<()>;
78}
79
80/// Fold a collection of sub-indexes into one by merging them in order.
81///
82/// Returns `Ok(None)` for an empty input, and otherwise merges every later index
83/// into the first and returns it. This is the natural companion to
84/// [`IndexBuilder::build_parallel`]: build shards, then `merge` them.
85///
86/// # Errors
87///
88/// Propagates the first [`Mergeable::merge`] error (for example
89/// [`Duplicate`](iqdb_types::IqdbError::Duplicate) on a cross-shard id
90/// collision). Because `merge` is not transactional, a failure partway leaves
91/// the accumulator partially merged; the error is returned and the partial
92/// accumulator dropped.
93///
94/// # Examples
95///
96/// ```
97/// # use std::sync::Arc;
98/// # use iqdb_build::{IndexBuilder, Mergeable, merge};
99/// # use iqdb_types::{DistanceMetric, IqdbError, VectorId};
100/// # use iqdb_index::{Index, IndexCore, IndexStats};
101/// # use iqdb_types::{Hit, Metadata, Result, SearchParams};
102/// # #[derive(Clone)] struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
103/// # #[derive(Clone, Default)] struct FlatConfig;
104/// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { if self.rows.iter().any(|(e,_)|e==&id){return Err(IqdbError::Duplicate);} self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
105/// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric,rows:Vec::new()})} }
106/// # impl Mergeable for Flat { fn merge(&mut self, other: Self) -> Result<()> { for (id, v) in other.rows { self.insert(id, v, None)?; } Ok(()) } }
107/// # fn main() -> iqdb_types::Result<()> {
108/// let items: Vec<_> = (0u64..100).map(|i| (VectorId::from(i), Arc::from([i as f32].as_slice()), None)).collect();
109/// let shards: Vec<Flat> = IndexBuilder::new(1, DistanceMetric::Euclidean).with_shards(4).build_parallel(items)?;
110///
111/// let one = merge(shards)?.expect("non-empty");
112/// assert_eq!(one.len(), 100);
113/// # Ok(()) }
114/// ```
115pub fn merge<I: Mergeable>(indexes: Vec<I>) -> Result<Option<I>> {
116    let mut iter = indexes.into_iter();
117    let Some(mut accumulator) = iter.next() else {
118        return Ok(None);
119    };
120    for next in iter {
121        accumulator.merge(next)?;
122    }
123    Ok(Some(accumulator))
124}
125
126impl<I: Mergeable> IndexBuilder<I> {
127    /// Build the input in parallel and merge the shards into a single index.
128    ///
129    /// This is the full bulk pipeline — *split → build in parallel → merge* — in
130    /// one call. It runs [`build_parallel`](IndexBuilder::build_parallel) (so the
131    /// shard count, [`with_shards`](IndexBuilder::with_shards), and any
132    /// [`on_progress`](IndexBuilder::on_progress) callback all apply), then folds
133    /// the shards with [`merge`].
134    ///
135    /// # Errors
136    ///
137    /// Any error from the parallel build ([`InvalidConfig`](iqdb_types::IqdbError::InvalidConfig),
138    /// [`DimensionMismatch`](iqdb_types::IqdbError::DimensionMismatch), within-shard
139    /// [`Duplicate`](iqdb_types::IqdbError::Duplicate)) or from
140    /// [`Mergeable::merge`] (notably a cross-shard `Duplicate`).
141    ///
142    /// # Examples
143    ///
144    /// ```
145    /// use std::sync::Arc;
146    /// use iqdb_build::{IndexBuilder, Mergeable};
147    /// use iqdb_types::{DistanceMetric, VectorId};
148    /// # use iqdb_index::{Index, IndexCore, IndexStats};
149    /// # use iqdb_types::{Hit, IqdbError, Metadata, Result, SearchParams};
150    /// # #[derive(Clone)] struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
151    /// # #[derive(Clone, Default)] struct FlatConfig;
152    /// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { if v.len()!=self.dim {return Err(IqdbError::DimensionMismatch{expected:self.dim,found:v.len()});} if self.rows.iter().any(|(e,_)|e==&id){return Err(IqdbError::Duplicate);} self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
153    /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{ if dim==0 {return Err(IqdbError::InvalidConfig{reason:"dim must be > 0"});} Ok(Flat{dim,metric,rows:Vec::new()}) } }
154    /// # impl Mergeable for Flat { fn merge(&mut self, other: Self) -> Result<()> { for (id, v) in other.rows { self.insert(id, v, None)?; } Ok(()) } }
155    /// # fn main() -> iqdb_types::Result<()> {
156    /// let items: Vec<_> = (0u64..1_000)
157    ///     .map(|i| (VectorId::from(i), Arc::from([i as f32, 0.0].as_slice()), None))
158    ///     .collect();
159    ///
160    /// // One call: shard, build across cores, merge back to a single index.
161    /// let index: Flat = IndexBuilder::new(2, DistanceMetric::Euclidean)
162    ///     .with_shards(8)
163    ///     .build_merged(items)?;
164    ///
165    /// assert_eq!(index.len(), 1_000);
166    /// # Ok(()) }
167    /// ```
168    pub fn build_merged<It>(&self, items: It) -> Result<I>
169    where
170        It: IntoIterator<Item = BuildItem>,
171        I::Config: Send + Sync,
172    {
173        let shards = self.build_parallel(items)?;
174        match merge(shards)? {
175            Some(index) => Ok(index),
176            // `build_parallel` always yields at least one shard, so this arm is
177            // unreachable in practice; constructing an empty index keeps the
178            // function total without an `unwrap`/`expect`.
179            None => I::new(self.dim, self.metric, self.config.clone()),
180        }
181    }
182}