Skip to main content

oxirs_vec/
index_dispatcher.rs

1//! Runtime index dispatcher (wraps the optimizer brain with concrete indices).
2//!
3//! This is the public entry point for the cost-based optimizer described in
4//! the `0.3.0` "Advanced query optimization" milestone:
5//!
6//! * The dispatcher owns one instance of each enabled index family
7//!   (HNSW / IVF / LSH / PQ).
8//! * It delegates "which family should I pick?" to
9//!   [`crate::optimizer::OptimizerDispatcher`] (a cost-model + stats brain).
10//! * It executes the query against the picked family, observes the actual
11//!   latency and result count, feeds an observation back to the brain, and
12//!   re-issues against the next-best family if the **observed recall** trips
13//!   the dispatcher's recall threshold.
14//!
15//! The split between "brain" (in `optimizer/`) and "wrapper" (this file)
16//! mirrors the split between `query_planning.rs` (brain-only) and
17//! `dynamic_index_selector.rs` (wrapper) but uses the new cost-model formulas
18//! and the persisted `QueryStats`.
19//!
20//! Persistence: the dispatcher periodically saves its [`QueryStats`] to
21//! disk for online learning across process restarts.  Use
22//! [`IndexDispatcher::with_stats_path`] to enable persistence.
23
24use crate::hnsw::{HnswConfig, HnswIndex};
25use crate::ivf::{IvfConfig, IvfIndex};
26use crate::lsh::{LshConfig, LshIndex};
27use crate::optimizer::cost_model::{CostModel, IndexFamily, IndexParameters, WorkloadProfile};
28use crate::optimizer::index_dispatcher::{DispatchPlan, DispatcherConfig, OptimizerDispatcher};
29use crate::optimizer::query_stats::{QueryObservation, QueryStats};
30use crate::pq::{PQConfig, PQIndex};
31use crate::{Vector, VectorIndex};
32use anyhow::{anyhow, Result};
33use std::path::PathBuf;
34use std::sync::{Arc, RwLock};
35use std::time::Instant;
36use tracing::{debug, warn};
37
38/// User-facing configuration for the runtime index dispatcher.
39#[derive(Debug, Clone)]
40pub struct IndexDispatcherConfig {
41    /// Cost-model formula parameters.
42    pub parameters: IndexParameters,
43    /// Brain-level dispatcher config (recall threshold, weight refresh, etc.).
44    pub dispatcher: DispatcherConfig,
45    /// HNSW config used when the dispatcher chooses the HNSW family.
46    pub hnsw_config: HnswConfig,
47    /// IVF config used when the dispatcher chooses the IVF family.
48    pub ivf_config: IvfConfig,
49    /// LSH config used when the dispatcher chooses the LSH family.
50    pub lsh_config: LshConfig,
51    /// PQ config used when the dispatcher chooses the PQ family.
52    pub pq_config: PQConfig,
53    /// File on disk where [`QueryStats`] is persisted (`None` disables it).
54    pub stats_path: Option<PathBuf>,
55    /// Save the stats file after this many observations (rate-limit IO).
56    pub stats_save_interval: u64,
57}
58
59impl Default for IndexDispatcherConfig {
60    fn default() -> Self {
61        Self {
62            parameters: IndexParameters::default(),
63            dispatcher: DispatcherConfig::default(),
64            hnsw_config: HnswConfig::default(),
65            ivf_config: IvfConfig::default(),
66            lsh_config: LshConfig::default(),
67            pq_config: PQConfig::default(),
68            stats_path: None,
69            stats_save_interval: 256,
70        }
71    }
72}
73
74/// One row produced by [`IndexDispatcher::search_knn_with_plan`].
75#[derive(Debug, Clone)]
76pub struct DispatchedSearch {
77    /// k-NN result list returned by the chosen index.
78    pub results: Vec<(String, f32)>,
79    /// Family that ultimately served the query (after fallbacks).
80    pub served_by: IndexFamily,
81    /// Plan generated by the optimizer brain at dispatch time.
82    pub plan: DispatchPlan,
83    /// Number of fallback re-issues performed (0 = primary served).
84    pub fallback_attempts: usize,
85    /// Wall-clock latency in microseconds for the total dispatch (incl. retries).
86    pub latency_us: f64,
87}
88
89/// Runtime index dispatcher.
90///
91/// Workflow:
92///
93/// ```ignore
94/// let mut d = IndexDispatcher::new(config)?;
95/// d.insert("v1".into(), vector)?;   // buffered
96/// d.insert("v2".into(), vector)?;
97/// // ...
98/// d.build()?;                        // trains IVF + PQ, materialises HNSW + LSH
99/// let res = d.search_knn(&query, 10)?;
100/// ```
101///
102/// Insert-buffering is required because IVF and PQ both need a training
103/// pass over a representative sample before any vectors can be inserted.
104/// HNSW and LSH have no training phase but are also built during
105/// [`Self::build`] for symmetry.
106pub struct IndexDispatcher {
107    config: IndexDispatcherConfig,
108    brain: Arc<RwLock<OptimizerDispatcher>>,
109    /// Buffered insertions waiting for `build()` to materialise indices.
110    pending: Vec<(String, Vector)>,
111    hnsw: Option<HnswIndex>,
112    ivf: Option<IvfIndex>,
113    lsh: Option<LshIndex>,
114    pq: Option<PQIndex>,
115    /// Current vector count across the dispatcher (used for workload profile).
116    vector_count: usize,
117    /// Current vector dimensionality.
118    vector_dim: usize,
119    /// `true` once `build()` has been called and indices are searchable.
120    is_built: bool,
121}
122
123impl IndexDispatcher {
124    /// Build a fresh dispatcher.  No indices are created yet — call
125    /// [`Self::insert`] then [`Self::build`] (or rely on lazy build inside
126    /// [`Self::insert`] for in-memory indices that don't need a build phase).
127    pub fn new(config: IndexDispatcherConfig) -> Result<Self> {
128        let cost_model = CostModel::new(config.parameters.clone(), Default::default());
129
130        // Restore stats from disk if a path is configured and the file exists.
131        let stats = if let Some(path) = &config.stats_path {
132            if path.exists() {
133                match QueryStats::load(path) {
134                    Ok(s) => {
135                        debug!(
136                            "IndexDispatcher: loaded {} observations from {:?}",
137                            s.total_observations, path
138                        );
139                        s
140                    }
141                    Err(e) => {
142                        warn!(
143                            "IndexDispatcher: failed to load stats from {:?}: {} — starting fresh",
144                            path, e
145                        );
146                        QueryStats::default()
147                    }
148                }
149            } else {
150                QueryStats::default()
151            }
152        } else {
153            QueryStats::default()
154        };
155
156        let brain = OptimizerDispatcher::new(cost_model, stats, config.dispatcher.clone());
157
158        Ok(Self {
159            config,
160            brain: Arc::new(RwLock::new(brain)),
161            pending: Vec::new(),
162            hnsw: None,
163            ivf: None,
164            lsh: None,
165            pq: None,
166            vector_count: 0,
167            vector_dim: 0,
168            is_built: false,
169        })
170    }
171
172    /// Convenience: dispatcher with default config and a stats file path.
173    pub fn with_stats_path(stats_path: PathBuf) -> Result<Self> {
174        let config = IndexDispatcherConfig {
175            stats_path: Some(stats_path),
176            ..Default::default()
177        };
178        Self::new(config)
179    }
180
181    /// Buffer a vector for insertion.  Indices are not built until
182    /// [`Self::build`] is called.
183    ///
184    /// After `build()` completes, additional inserts go directly into the
185    /// already-trained HNSW and LSH indices; IVF and PQ skip them since they
186    /// would require re-training.
187    pub fn insert(&mut self, uri: String, vector: Vector) -> Result<()> {
188        if self.vector_dim == 0 {
189            self.vector_dim = vector.dimensions;
190        } else if vector.dimensions != self.vector_dim {
191            return Err(anyhow!(
192                "IndexDispatcher::insert: dim mismatch (have {}, got {})",
193                self.vector_dim,
194                vector.dimensions
195            ));
196        }
197
198        if !self.is_built {
199            self.pending.push((uri, vector));
200            return Ok(());
201        }
202
203        // Post-build inserts: only HNSW and LSH (no training needed).
204        if let Some(hnsw) = &mut self.hnsw {
205            hnsw.insert(uri.clone(), vector.clone())?;
206        }
207        if let Some(lsh) = &mut self.lsh {
208            lsh.insert(uri, vector)?;
209        }
210        self.vector_count += 1;
211        Ok(())
212    }
213
214    /// Materialise every enabled family from the buffered inserts.
215    ///
216    /// IVF and PQ are trained on the buffered set (or a 10 000-vector
217    /// sample, whichever is smaller); HNSW and LSH are bulk-inserted.
218    pub fn build(&mut self) -> Result<()> {
219        if self.is_built {
220            return Ok(());
221        }
222        if self.pending.is_empty() {
223            // Nothing to build — leave indices `None`; search will error.
224            self.is_built = true;
225            return Ok(());
226        }
227
228        // ── HNSW ────────────────────────────────────────────────────────────
229        let mut hnsw = HnswIndex::new(self.config.hnsw_config.clone())
230            .map_err(|e| anyhow!("IndexDispatcher::build: HnswIndex::new failed: {}", e))?;
231        for (uri, v) in &self.pending {
232            hnsw.insert(uri.clone(), v.clone())?;
233        }
234        self.hnsw = Some(hnsw);
235
236        // ── LSH ─────────────────────────────────────────────────────────────
237        let mut lsh = LshIndex::new(self.config.lsh_config.clone());
238        for (uri, v) in &self.pending {
239            lsh.insert(uri.clone(), v.clone())?;
240        }
241        self.lsh = Some(lsh);
242
243        // ── IVF (must train first, then insert) ─────────────────────────────
244        // Cap training set at 10k samples to keep build time bounded.
245        let sample_size = self.pending.len().min(10_000);
246        let training_set: Vec<Vector> = self
247            .pending
248            .iter()
249            .take(sample_size)
250            .map(|(_, v)| v.clone())
251            .collect();
252
253        let mut ivf_config = self.config.ivf_config.clone();
254        // Keep n_clusters ≤ training set size, otherwise k-means cannot run.
255        if ivf_config.n_clusters > sample_size {
256            ivf_config.n_clusters = sample_size.max(1);
257        }
258        let mut ivf = IvfIndex::new(ivf_config)?;
259        ivf.train(&training_set)?;
260        for (uri, v) in &self.pending {
261            ivf.insert(uri.clone(), v.clone())?;
262        }
263        self.ivf = Some(ivf);
264
265        // ── PQ (must train first, then insert) ──────────────────────────────
266        let pq_dim = self.pending[0].1.dimensions;
267        let mut pq_config = self.config.pq_config.clone();
268        // PQ requires `n_subquantizers` to divide `dim` exactly; clamp to
269        // the largest divisor ≤ the configured value.
270        if pq_dim % pq_config.n_subquantizers != 0 {
271            // Find largest k ≤ configured that divides pq_dim.
272            let mut k = pq_config.n_subquantizers.min(pq_dim).max(1);
273            while k > 1 && pq_dim % k != 0 {
274                k -= 1;
275            }
276            pq_config.n_subquantizers = k;
277        }
278        let mut pq = PQIndex::new(pq_config);
279        pq.train(&training_set)?;
280        for (uri, v) in &self.pending {
281            pq.insert(uri.clone(), v.clone())?;
282        }
283        self.pq = Some(pq);
284
285        self.vector_count = self.pending.len();
286        self.pending.clear();
287        self.is_built = true;
288        Ok(())
289    }
290
291    /// `true` once [`Self::build`] has been invoked.
292    pub fn is_built(&self) -> bool {
293        self.is_built
294    }
295
296    /// Number of vectors indexed.
297    pub fn len(&self) -> usize {
298        self.vector_count
299    }
300
301    /// `true` when no vectors are indexed.
302    pub fn is_empty(&self) -> bool {
303        self.vector_count == 0
304    }
305
306    /// Search the dispatcher with default workload (`k`, density=1.0,
307    /// recall=brain config).  This is the simple entry point.
308    pub fn search_knn(&self, query: &Vector, k: usize) -> Result<Vec<(String, f32)>> {
309        let dispatched = self.search_knn_with_plan(query, k, 1.0)?;
310        Ok(dispatched.results)
311    }
312
313    /// Search with explicit query density (filter selectivity).  Returns
314    /// the full [`DispatchedSearch`] for observability.
315    pub fn search_knn_with_plan(
316        &self,
317        query: &Vector,
318        k: usize,
319        query_density: f32,
320    ) -> Result<DispatchedSearch> {
321        if !self.is_built {
322            return Err(anyhow!(
323                "IndexDispatcher::search_knn: dispatcher must be built first (call build())"
324            ));
325        }
326        if self.vector_count == 0 {
327            return Err(anyhow!("IndexDispatcher::search_knn: no vectors indexed"));
328        }
329
330        let workload = WorkloadProfile::new(
331            self.vector_count,
332            self.vector_dim,
333            self.config.dispatcher.recall_fallback_threshold,
334        )
335        .with_query_density(query_density)
336        .with_k(k);
337
338        let plan = {
339            let brain = self
340                .brain
341                .read()
342                .map_err(|_| anyhow!("dispatcher brain RwLock poisoned"))?;
343            brain
344                .pick_plan(&workload)
345                .map_err(|e| anyhow!("dispatcher brain failed to plan: {}", e))?
346        };
347
348        let start = Instant::now();
349        let mut current = plan.primary;
350        let mut fallback_attempts = 0;
351        let mut results = self.execute_with_family(current, query, k)?;
352
353        // Apply fallback policy: if results came up empty *and* the brain
354        // says we have fallback budget, retry with the next-best family.
355        let max_fallbacks = self.config.dispatcher.max_fallbacks;
356        let observed_recall_proxy = if results.is_empty() { 0.0 } else { 1.0 };
357        if max_fallbacks > 0 && results.is_empty() {
358            for next_family in plan.fallbacks.iter().map(|e| e.family).take(max_fallbacks) {
359                fallback_attempts += 1;
360                tracing::info!(
361                    "IndexDispatcher: empty result from {:?}, falling back to {:?}",
362                    current,
363                    next_family
364                );
365                current = next_family;
366                results = self.execute_with_family(current, query, k)?;
367                if !results.is_empty() {
368                    break;
369                }
370            }
371        }
372
373        let elapsed_us = start.elapsed().as_secs_f64() * 1_000_000.0;
374
375        // Record observation for online learning.
376        let observation = QueryObservation::new(
377            current,
378            !results.is_empty(),
379            elapsed_us,
380            // We don't have ground truth; record the recall *proxy* (1.0 if hit, 0.0 else)
381            // so the stats reflect operational hit rate.
382            Some(observed_recall_proxy),
383            plan.primary_cost,
384        );
385        {
386            let mut brain = self
387                .brain
388                .write()
389                .map_err(|_| anyhow!("dispatcher brain RwLock poisoned"))?;
390            let refreshed = brain.record_observation(observation);
391            if refreshed {
392                debug!("IndexDispatcher: refreshed cost-model weights");
393            }
394        }
395
396        // Optionally persist stats.
397        self.maybe_persist_stats()?;
398
399        Ok(DispatchedSearch {
400            results,
401            served_by: current,
402            plan,
403            fallback_attempts,
404            latency_us: elapsed_us,
405        })
406    }
407
408    /// Force-flush stats to disk now (no-op if no path configured).
409    pub fn flush_stats(&self) -> Result<()> {
410        if let Some(path) = &self.config.stats_path {
411            let brain = self
412                .brain
413                .read()
414                .map_err(|_| anyhow!("dispatcher brain RwLock poisoned"))?;
415            brain.stats().save(path)?;
416        }
417        Ok(())
418    }
419
420    /// Snapshot of internal observation counts for diagnostics.
421    pub fn observation_count(&self) -> Result<u64> {
422        let brain = self
423            .brain
424            .read()
425            .map_err(|_| anyhow!("dispatcher brain RwLock poisoned"))?;
426        Ok(brain.stats().total_observations)
427    }
428
429    fn execute_with_family(
430        &self,
431        family: IndexFamily,
432        query: &Vector,
433        k: usize,
434    ) -> Result<Vec<(String, f32)>> {
435        match family {
436            IndexFamily::Hnsw => self
437                .hnsw
438                .as_ref()
439                .map(|i| i.search_knn(query, k))
440                .unwrap_or_else(|| Err(anyhow!("HNSW family not built"))),
441            IndexFamily::Ivf => self
442                .ivf
443                .as_ref()
444                .map(|i| i.search_knn(query, k))
445                .unwrap_or_else(|| Err(anyhow!("IVF family not built"))),
446            IndexFamily::Lsh => self
447                .lsh
448                .as_ref()
449                .map(|i| i.search_knn(query, k))
450                .unwrap_or_else(|| Err(anyhow!("LSH family not built"))),
451            IndexFamily::Pq => self
452                .pq
453                .as_ref()
454                .map(|i| i.search_knn(query, k))
455                .unwrap_or_else(|| Err(anyhow!("PQ family not built"))),
456        }
457    }
458
459    fn maybe_persist_stats(&self) -> Result<()> {
460        if self.config.stats_path.is_none() {
461            return Ok(());
462        }
463        // Saves are throttled to once per `stats_save_interval` observations.
464        // We approximate this by checking the brain's observation counter.
465        let brain = self
466            .brain
467            .read()
468            .map_err(|_| anyhow!("dispatcher brain RwLock poisoned"))?;
469        let total = brain.stats().total_observations;
470        let interval = self.config.stats_save_interval.max(1);
471        if total % interval == 0 && total > 0 {
472            if let Some(path) = &self.config.stats_path {
473                if let Err(e) = brain.stats().save(path) {
474                    warn!("IndexDispatcher: stats save to {:?} failed: {}", path, e);
475                }
476            }
477        }
478        Ok(())
479    }
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485    use std::env::temp_dir;
486
487    fn unique_stats_path() -> PathBuf {
488        let stamp = std::time::SystemTime::now()
489            .duration_since(std::time::UNIX_EPOCH)
490            .map(|d| d.as_nanos())
491            .unwrap_or(0);
492        let mut p = temp_dir();
493        p.push(format!("oxirs_vec_dispatcher_{}.json", stamp));
494        p
495    }
496
497    fn random_vec(seed: u64, dim: usize) -> Vector {
498        let mut state = seed.wrapping_mul(2654435769).wrapping_add(0x9E37_79B9);
499        let mut values = Vec::with_capacity(dim);
500        for _ in 0..dim {
501            state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
502            let f = (state as f32) / (u64::MAX as f32);
503            values.push((f - 0.5) * 2.0);
504        }
505        Vector::new(values)
506    }
507
508    /// Build a dispatcher config tuned for small test datasets — IVF needs
509    /// fewer clusters than the default 256 when working with <50 vectors.
510    fn small_test_config() -> IndexDispatcherConfig {
511        let mut cfg = IndexDispatcherConfig::default();
512        // ivf_config is itself a Default-initialised struct; mutating the
513        // public fields here is appropriate — `clippy::field_reassign_with_default`
514        // does not apply when we're modifying a sub-field of an
515        // already-extracted variable.
516        cfg.ivf_config.n_clusters = 4;
517        cfg.ivf_config.n_probes = 2;
518        cfg
519    }
520
521    #[test]
522    fn dispatcher_can_insert_and_search() -> Result<()> {
523        let mut d = IndexDispatcher::new(small_test_config())?;
524        for i in 0..50 {
525            d.insert(format!("v{}", i), random_vec(i as u64 + 1, 32))?;
526        }
527        d.build()?;
528        let q = random_vec(1, 32);
529        let results = d.search_knn(&q, 5)?;
530        // Some non-empty subset of 5 results expected.
531        assert!(!results.is_empty());
532        Ok(())
533    }
534
535    #[test]
536    fn search_with_plan_returns_metadata() -> Result<()> {
537        let mut d = IndexDispatcher::new(small_test_config())?;
538        for i in 0..50 {
539            d.insert(format!("v{}", i), random_vec(i as u64 + 1, 16))?;
540        }
541        d.build()?;
542        let q = random_vec(2, 16);
543        let dispatched = d.search_knn_with_plan(&q, 4, 1.0)?;
544        assert!(dispatched.latency_us >= 0.0);
545        // plan.primary must equal served_by when no fallback.
546        assert_eq!(dispatched.plan.primary, dispatched.served_by);
547        Ok(())
548    }
549
550    #[test]
551    fn dispatcher_persists_stats_to_disk() -> Result<()> {
552        let path = unique_stats_path();
553        let mut config = small_test_config();
554        config.stats_path = Some(path.clone());
555        config.stats_save_interval = 1; // Save after every observation
556        let mut d = IndexDispatcher::new(config)?;
557        for i in 0..16 {
558            d.insert(format!("v{}", i), random_vec(i as u64 + 1, 8))?;
559        }
560        d.build()?;
561        let q = random_vec(99, 8);
562        let _ = d.search_knn(&q, 3)?;
563        assert!(path.exists(), "stats file must be created");
564        let loaded = QueryStats::load(&path)?;
565        assert!(loaded.total_observations >= 1);
566        let _ = std::fs::remove_file(&path);
567        Ok(())
568    }
569
570    #[test]
571    fn search_on_unbuilt_dispatcher_errors() -> Result<()> {
572        let d = IndexDispatcher::new(IndexDispatcherConfig::default())?;
573        let q = random_vec(1, 4);
574        let res = d.search_knn(&q, 3);
575        assert!(res.is_err());
576        Ok(())
577    }
578
579    #[test]
580    fn mismatched_dim_errors() -> Result<()> {
581        let mut d = IndexDispatcher::new(IndexDispatcherConfig::default())?;
582        d.insert("v1".into(), random_vec(1, 8))?;
583        let res = d.insert("v2".into(), random_vec(2, 16));
584        assert!(res.is_err(), "dim mismatch must error");
585        Ok(())
586    }
587
588    #[test]
589    fn flush_stats_no_op_without_path() -> Result<()> {
590        let d = IndexDispatcher::new(IndexDispatcherConfig::default())?;
591        d.flush_stats()?;
592        Ok(())
593    }
594
595    #[test]
596    fn flush_stats_writes_file_when_path_set() -> Result<()> {
597        let path = unique_stats_path();
598        let d = IndexDispatcher::with_stats_path(path.clone())?;
599        d.flush_stats()?;
600        assert!(path.exists(), "flush must create the file");
601        let _ = std::fs::remove_file(&path);
602        Ok(())
603    }
604
605    #[test]
606    fn restart_loads_previous_stats() -> Result<()> {
607        let path = unique_stats_path();
608        // First run: produce some observations.
609        {
610            let mut config = small_test_config();
611            config.stats_path = Some(path.clone());
612            config.stats_save_interval = 1;
613            let mut d = IndexDispatcher::new(config)?;
614            for i in 0..16 {
615                d.insert(format!("v{}", i), random_vec(i as u64 + 1, 4))?;
616            }
617            d.build()?;
618            let q = random_vec(99, 4);
619            let _ = d.search_knn(&q, 2)?;
620            d.flush_stats()?;
621        }
622        // Second run: should reload observations.
623        let mut config = small_test_config();
624        config.stats_path = Some(path.clone());
625        let d2 = IndexDispatcher::new(config)?;
626        let n = d2.observation_count()?;
627        assert!(n >= 1, "second run must load at least 1 observation");
628        let _ = std::fs::remove_file(&path);
629        Ok(())
630    }
631
632    #[test]
633    fn build_is_idempotent() -> Result<()> {
634        let mut d = IndexDispatcher::new(small_test_config())?;
635        for i in 0..16 {
636            d.insert(format!("v{}", i), random_vec(i as u64 + 1, 8))?;
637        }
638        d.build()?;
639        // Calling build again is a no-op.
640        d.build()?;
641        assert!(d.is_built());
642        Ok(())
643    }
644
645    #[test]
646    fn post_build_inserts_go_to_hnsw_and_lsh() -> Result<()> {
647        let mut d = IndexDispatcher::new(small_test_config())?;
648        for i in 0..16 {
649            d.insert(format!("v{}", i), random_vec(i as u64 + 1, 8))?;
650        }
651        d.build()?;
652        let pre = d.len();
653        d.insert("late".into(), random_vec(999, 8))?;
654        assert_eq!(d.len(), pre + 1);
655        Ok(())
656    }
657}