iqdb_build/merge.rs
1//! Combining independently built sub-indexes into one.
2//!
3//! [`build_parallel`](IndexBuilder::build_parallel) produces a `Vec` of shards;
4//! merging folds them back into a single index. The *mechanism* differs per
5//! backend — a flat index appends rows, an IVF index extends its posting lists,
6//! a graph index re-runs its boundary heuristics — so it cannot be expressed
7//! through the read-only [`IndexCore`](iqdb_index::IndexCore) surface. The
8//! [`Mergeable`] trait is the seam: a backend opts in by saying how to absorb
9//! another instance of itself, and [`merge`] / [`IndexBuilder::build_merged`]
10//! orchestrate the fold.
11
12use iqdb_index::Index;
13use iqdb_types::Result;
14
15use crate::builder::{BuildItem, IndexBuilder};
16
17/// A backend that can absorb another instance of itself.
18///
19/// Implement this for an [`Index`] so [`merge`] and
20/// [`IndexBuilder::build_merged`] can combine sharded builds into one index. The
21/// trait deliberately takes `other` **by value**: merging consumes the source,
22/// letting an implementation move its storage rather than copy it.
23///
24/// # Contract
25///
26/// - **Same shape.** `other` must share `self`'s `dim` and `metric`. An
27/// implementation should return
28/// [`InvalidConfig`](iqdb_types::IqdbError::InvalidConfig) on a mismatch rather
29/// than silently producing a corrupt index.
30/// - **Set union.** After `self.merge(other)?`, every id searchable in either
31/// input is searchable in `self`. This is where cross-shard id collisions
32/// surface: if both inputs hold the same id, an implementation returns
33/// [`Duplicate`](iqdb_types::IqdbError::Duplicate).
34/// - **Failure is observable, not silent.** On `Err`, `self` may be left
35/// partially merged; the operation is not transactional. Callers that need the
36/// original back should merge into a clone.
37///
38/// # Examples
39///
40/// A flat index merges by appending the other's rows:
41///
42/// ```
43/// use std::sync::Arc;
44/// use iqdb_build::{Mergeable, merge};
45/// use iqdb_types::{DistanceMetric, VectorId};
46/// # use iqdb_index::{Index, IndexCore, IndexStats};
47/// # use iqdb_types::{Hit, IqdbError, Metadata, Result, SearchParams};
48/// # #[derive(Clone)] struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
49/// # #[derive(Clone, Default)] struct FlatConfig;
50/// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { if self.rows.iter().any(|(e,_)|e==&id){return Err(IqdbError::Duplicate);} self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
51/// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric,rows:Vec::new()})} }
52/// impl Mergeable for Flat {
53/// fn merge(&mut self, other: Self) -> Result<()> {
54/// if other.dim() != self.dim() || other.metric() != self.metric() {
55/// return Err(IqdbError::InvalidConfig { reason: "merge shape mismatch" });
56/// }
57/// for (id, vector) in other.rows {
58/// self.insert(id, vector, None)?; // re-checks duplicates
59/// }
60/// Ok(())
61/// }
62/// }
63/// # fn main() -> iqdb_types::Result<()> {
64/// let mut a = Flat::new(1, DistanceMetric::Euclidean, FlatConfig::default())?;
65/// a.insert(VectorId::from(1u64), Arc::from([0.0_f32].as_slice()), None)?;
66/// let mut b = Flat::new(1, DistanceMetric::Euclidean, FlatConfig::default())?;
67/// b.insert(VectorId::from(2u64), Arc::from([1.0_f32].as_slice()), None)?;
68///
69/// a.merge(b)?;
70/// assert_eq!(a.len(), 2);
71/// # Ok(()) }
72/// ```
73pub trait Mergeable: Index {
74 /// Absorb `other` into `self`, leaving `self` searchable over the union of
75 /// both. See the [trait contract](Mergeable#contract) for the guarantees and
76 /// failure modes.
77 fn merge(&mut self, other: Self) -> Result<()>;
78}
79
80/// Fold a collection of sub-indexes into one by merging them in order.
81///
82/// Returns `Ok(None)` for an empty input, and otherwise merges every later index
83/// into the first and returns it. This is the natural companion to
84/// [`IndexBuilder::build_parallel`]: build shards, then `merge` them.
85///
86/// # Errors
87///
88/// Propagates the first [`Mergeable::merge`] error (for example
89/// [`Duplicate`](iqdb_types::IqdbError::Duplicate) on a cross-shard id
90/// collision). Because `merge` is not transactional, a failure partway leaves
91/// the accumulator partially merged; the error is returned and the partial
92/// accumulator dropped.
93///
94/// # Examples
95///
96/// ```
97/// # use std::sync::Arc;
98/// # use iqdb_build::{IndexBuilder, Mergeable, merge};
99/// # use iqdb_types::{DistanceMetric, IqdbError, VectorId};
100/// # use iqdb_index::{Index, IndexCore, IndexStats};
101/// # use iqdb_types::{Hit, Metadata, Result, SearchParams};
102/// # #[derive(Clone)] struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
103/// # #[derive(Clone, Default)] struct FlatConfig;
104/// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { if self.rows.iter().any(|(e,_)|e==&id){return Err(IqdbError::Duplicate);} self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
105/// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{Ok(Flat{dim,metric,rows:Vec::new()})} }
106/// # impl Mergeable for Flat { fn merge(&mut self, other: Self) -> Result<()> { for (id, v) in other.rows { self.insert(id, v, None)?; } Ok(()) } }
107/// # fn main() -> iqdb_types::Result<()> {
108/// let items: Vec<_> = (0u64..100).map(|i| (VectorId::from(i), Arc::from([i as f32].as_slice()), None)).collect();
109/// let shards: Vec<Flat> = IndexBuilder::new(1, DistanceMetric::Euclidean).with_shards(4).build_parallel(items)?;
110///
111/// let one = merge(shards)?.expect("non-empty");
112/// assert_eq!(one.len(), 100);
113/// # Ok(()) }
114/// ```
115pub fn merge<I: Mergeable>(indexes: Vec<I>) -> Result<Option<I>> {
116 let mut iter = indexes.into_iter();
117 let Some(mut accumulator) = iter.next() else {
118 return Ok(None);
119 };
120 for next in iter {
121 accumulator.merge(next)?;
122 }
123 Ok(Some(accumulator))
124}
125
126impl<I: Mergeable> IndexBuilder<I> {
127 /// Build the input in parallel and merge the shards into a single index.
128 ///
129 /// This is the full bulk pipeline — *split → build in parallel → merge* — in
130 /// one call. It runs [`build_parallel`](IndexBuilder::build_parallel) (so the
131 /// shard count, [`with_shards`](IndexBuilder::with_shards), and any
132 /// [`on_progress`](IndexBuilder::on_progress) callback all apply), then folds
133 /// the shards with [`merge`].
134 ///
135 /// # Errors
136 ///
137 /// Any error from the parallel build ([`InvalidConfig`](iqdb_types::IqdbError::InvalidConfig),
138 /// [`DimensionMismatch`](iqdb_types::IqdbError::DimensionMismatch), within-shard
139 /// [`Duplicate`](iqdb_types::IqdbError::Duplicate)) or from
140 /// [`Mergeable::merge`] (notably a cross-shard `Duplicate`).
141 ///
142 /// # Examples
143 ///
144 /// ```
145 /// use std::sync::Arc;
146 /// use iqdb_build::{IndexBuilder, Mergeable};
147 /// use iqdb_types::{DistanceMetric, VectorId};
148 /// # use iqdb_index::{Index, IndexCore, IndexStats};
149 /// # use iqdb_types::{Hit, IqdbError, Metadata, Result, SearchParams};
150 /// # #[derive(Clone)] struct Flat { dim: usize, metric: DistanceMetric, rows: Vec<(VectorId, Arc<[f32]>)> }
151 /// # #[derive(Clone, Default)] struct FlatConfig;
152 /// # impl IndexCore for Flat { fn insert(&mut self, id: VectorId, v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> { if v.len()!=self.dim {return Err(IqdbError::DimensionMismatch{expected:self.dim,found:v.len()});} if self.rows.iter().any(|(e,_)|e==&id){return Err(IqdbError::Duplicate);} self.rows.push((id,v)); Ok(()) } fn delete(&mut self,_i:&VectorId)->Result<()>{Ok(())} fn search(&self,_q:&[f32],_p:&SearchParams)->Result<Vec<Hit>>{Ok(Vec::new())} fn len(&self)->usize{self.rows.len()} fn dim(&self)->usize{self.dim} fn metric(&self)->DistanceMetric{self.metric} fn flush(&mut self)->Result<()>{Ok(())} fn stats(&self)->IndexStats{IndexStats::default()} }
153 /// # impl Index for Flat { type Config = FlatConfig; fn new(dim:usize,metric:DistanceMetric,_c:Self::Config)->Result<Self>{ if dim==0 {return Err(IqdbError::InvalidConfig{reason:"dim must be > 0"});} Ok(Flat{dim,metric,rows:Vec::new()}) } }
154 /// # impl Mergeable for Flat { fn merge(&mut self, other: Self) -> Result<()> { for (id, v) in other.rows { self.insert(id, v, None)?; } Ok(()) } }
155 /// # fn main() -> iqdb_types::Result<()> {
156 /// let items: Vec<_> = (0u64..1_000)
157 /// .map(|i| (VectorId::from(i), Arc::from([i as f32, 0.0].as_slice()), None))
158 /// .collect();
159 ///
160 /// // One call: shard, build across cores, merge back to a single index.
161 /// let index: Flat = IndexBuilder::new(2, DistanceMetric::Euclidean)
162 /// .with_shards(8)
163 /// .build_merged(items)?;
164 ///
165 /// assert_eq!(index.len(), 1_000);
166 /// # Ok(()) }
167 /// ```
168 pub fn build_merged<It>(&self, items: It) -> Result<I>
169 where
170 It: IntoIterator<Item = BuildItem>,
171 I::Config: Send + Sync,
172 {
173 let shards = self.build_parallel(items)?;
174 match merge(shards)? {
175 Some(index) => Ok(index),
176 // `build_parallel` always yields at least one shard, so this arm is
177 // unreachable in practice; constructing an empty index keeps the
178 // function total without an `unwrap`/`expect`.
179 None => I::new(self.dim, self.metric, self.config.clone()),
180 }
181 }
182}