1use crate::hnsw::{HnswConfig, HnswIndex};
25use crate::ivf::{IvfConfig, IvfIndex};
26use crate::lsh::{LshConfig, LshIndex};
27use crate::optimizer::cost_model::{CostModel, IndexFamily, IndexParameters, WorkloadProfile};
28use crate::optimizer::index_dispatcher::{DispatchPlan, DispatcherConfig, OptimizerDispatcher};
29use crate::optimizer::query_stats::{QueryObservation, QueryStats};
30use crate::pq::{PQConfig, PQIndex};
31use crate::{Vector, VectorIndex};
32use anyhow::{anyhow, Result};
33use std::path::PathBuf;
34use std::sync::{Arc, RwLock};
35use std::time::Instant;
36use tracing::{debug, warn};
37
38#[derive(Debug, Clone)]
40pub struct IndexDispatcherConfig {
41 pub parameters: IndexParameters,
43 pub dispatcher: DispatcherConfig,
45 pub hnsw_config: HnswConfig,
47 pub ivf_config: IvfConfig,
49 pub lsh_config: LshConfig,
51 pub pq_config: PQConfig,
53 pub stats_path: Option<PathBuf>,
55 pub stats_save_interval: u64,
57}
58
59impl Default for IndexDispatcherConfig {
60 fn default() -> Self {
61 Self {
62 parameters: IndexParameters::default(),
63 dispatcher: DispatcherConfig::default(),
64 hnsw_config: HnswConfig::default(),
65 ivf_config: IvfConfig::default(),
66 lsh_config: LshConfig::default(),
67 pq_config: PQConfig::default(),
68 stats_path: None,
69 stats_save_interval: 256,
70 }
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct DispatchedSearch {
77 pub results: Vec<(String, f32)>,
79 pub served_by: IndexFamily,
81 pub plan: DispatchPlan,
83 pub fallback_attempts: usize,
85 pub latency_us: f64,
87}
88
89pub struct IndexDispatcher {
107 config: IndexDispatcherConfig,
108 brain: Arc<RwLock<OptimizerDispatcher>>,
109 pending: Vec<(String, Vector)>,
111 hnsw: Option<HnswIndex>,
112 ivf: Option<IvfIndex>,
113 lsh: Option<LshIndex>,
114 pq: Option<PQIndex>,
115 vector_count: usize,
117 vector_dim: usize,
119 is_built: bool,
121}
122
123impl IndexDispatcher {
124 pub fn new(config: IndexDispatcherConfig) -> Result<Self> {
128 let cost_model = CostModel::new(config.parameters.clone(), Default::default());
129
130 let stats = if let Some(path) = &config.stats_path {
132 if path.exists() {
133 match QueryStats::load(path) {
134 Ok(s) => {
135 debug!(
136 "IndexDispatcher: loaded {} observations from {:?}",
137 s.total_observations, path
138 );
139 s
140 }
141 Err(e) => {
142 warn!(
143 "IndexDispatcher: failed to load stats from {:?}: {} — starting fresh",
144 path, e
145 );
146 QueryStats::default()
147 }
148 }
149 } else {
150 QueryStats::default()
151 }
152 } else {
153 QueryStats::default()
154 };
155
156 let brain = OptimizerDispatcher::new(cost_model, stats, config.dispatcher.clone());
157
158 Ok(Self {
159 config,
160 brain: Arc::new(RwLock::new(brain)),
161 pending: Vec::new(),
162 hnsw: None,
163 ivf: None,
164 lsh: None,
165 pq: None,
166 vector_count: 0,
167 vector_dim: 0,
168 is_built: false,
169 })
170 }
171
172 pub fn with_stats_path(stats_path: PathBuf) -> Result<Self> {
174 let config = IndexDispatcherConfig {
175 stats_path: Some(stats_path),
176 ..Default::default()
177 };
178 Self::new(config)
179 }
180
181 pub fn insert(&mut self, uri: String, vector: Vector) -> Result<()> {
188 if self.vector_dim == 0 {
189 self.vector_dim = vector.dimensions;
190 } else if vector.dimensions != self.vector_dim {
191 return Err(anyhow!(
192 "IndexDispatcher::insert: dim mismatch (have {}, got {})",
193 self.vector_dim,
194 vector.dimensions
195 ));
196 }
197
198 if !self.is_built {
199 self.pending.push((uri, vector));
200 return Ok(());
201 }
202
203 if let Some(hnsw) = &mut self.hnsw {
205 hnsw.insert(uri.clone(), vector.clone())?;
206 }
207 if let Some(lsh) = &mut self.lsh {
208 lsh.insert(uri, vector)?;
209 }
210 self.vector_count += 1;
211 Ok(())
212 }
213
214 pub fn build(&mut self) -> Result<()> {
219 if self.is_built {
220 return Ok(());
221 }
222 if self.pending.is_empty() {
223 self.is_built = true;
225 return Ok(());
226 }
227
228 let mut hnsw = HnswIndex::new(self.config.hnsw_config.clone())
230 .map_err(|e| anyhow!("IndexDispatcher::build: HnswIndex::new failed: {}", e))?;
231 for (uri, v) in &self.pending {
232 hnsw.insert(uri.clone(), v.clone())?;
233 }
234 self.hnsw = Some(hnsw);
235
236 let mut lsh = LshIndex::new(self.config.lsh_config.clone());
238 for (uri, v) in &self.pending {
239 lsh.insert(uri.clone(), v.clone())?;
240 }
241 self.lsh = Some(lsh);
242
243 let sample_size = self.pending.len().min(10_000);
246 let training_set: Vec<Vector> = self
247 .pending
248 .iter()
249 .take(sample_size)
250 .map(|(_, v)| v.clone())
251 .collect();
252
253 let mut ivf_config = self.config.ivf_config.clone();
254 if ivf_config.n_clusters > sample_size {
256 ivf_config.n_clusters = sample_size.max(1);
257 }
258 let mut ivf = IvfIndex::new(ivf_config)?;
259 ivf.train(&training_set)?;
260 for (uri, v) in &self.pending {
261 ivf.insert(uri.clone(), v.clone())?;
262 }
263 self.ivf = Some(ivf);
264
265 let pq_dim = self.pending[0].1.dimensions;
267 let mut pq_config = self.config.pq_config.clone();
268 if pq_dim % pq_config.n_subquantizers != 0 {
271 let mut k = pq_config.n_subquantizers.min(pq_dim).max(1);
273 while k > 1 && pq_dim % k != 0 {
274 k -= 1;
275 }
276 pq_config.n_subquantizers = k;
277 }
278 let mut pq = PQIndex::new(pq_config);
279 pq.train(&training_set)?;
280 for (uri, v) in &self.pending {
281 pq.insert(uri.clone(), v.clone())?;
282 }
283 self.pq = Some(pq);
284
285 self.vector_count = self.pending.len();
286 self.pending.clear();
287 self.is_built = true;
288 Ok(())
289 }
290
291 pub fn is_built(&self) -> bool {
293 self.is_built
294 }
295
296 pub fn len(&self) -> usize {
298 self.vector_count
299 }
300
301 pub fn is_empty(&self) -> bool {
303 self.vector_count == 0
304 }
305
306 pub fn search_knn(&self, query: &Vector, k: usize) -> Result<Vec<(String, f32)>> {
309 let dispatched = self.search_knn_with_plan(query, k, 1.0)?;
310 Ok(dispatched.results)
311 }
312
313 pub fn search_knn_with_plan(
316 &self,
317 query: &Vector,
318 k: usize,
319 query_density: f32,
320 ) -> Result<DispatchedSearch> {
321 if !self.is_built {
322 return Err(anyhow!(
323 "IndexDispatcher::search_knn: dispatcher must be built first (call build())"
324 ));
325 }
326 if self.vector_count == 0 {
327 return Err(anyhow!("IndexDispatcher::search_knn: no vectors indexed"));
328 }
329
330 let workload = WorkloadProfile::new(
331 self.vector_count,
332 self.vector_dim,
333 self.config.dispatcher.recall_fallback_threshold,
334 )
335 .with_query_density(query_density)
336 .with_k(k);
337
338 let plan = {
339 let brain = self
340 .brain
341 .read()
342 .map_err(|_| anyhow!("dispatcher brain RwLock poisoned"))?;
343 brain
344 .pick_plan(&workload)
345 .map_err(|e| anyhow!("dispatcher brain failed to plan: {}", e))?
346 };
347
348 let start = Instant::now();
349 let mut current = plan.primary;
350 let mut fallback_attempts = 0;
351 let mut results = self.execute_with_family(current, query, k)?;
352
353 let max_fallbacks = self.config.dispatcher.max_fallbacks;
356 let observed_recall_proxy = if results.is_empty() { 0.0 } else { 1.0 };
357 if max_fallbacks > 0 && results.is_empty() {
358 for next_family in plan.fallbacks.iter().map(|e| e.family).take(max_fallbacks) {
359 fallback_attempts += 1;
360 tracing::info!(
361 "IndexDispatcher: empty result from {:?}, falling back to {:?}",
362 current,
363 next_family
364 );
365 current = next_family;
366 results = self.execute_with_family(current, query, k)?;
367 if !results.is_empty() {
368 break;
369 }
370 }
371 }
372
373 let elapsed_us = start.elapsed().as_secs_f64() * 1_000_000.0;
374
375 let observation = QueryObservation::new(
377 current,
378 !results.is_empty(),
379 elapsed_us,
380 Some(observed_recall_proxy),
383 plan.primary_cost,
384 );
385 {
386 let mut brain = self
387 .brain
388 .write()
389 .map_err(|_| anyhow!("dispatcher brain RwLock poisoned"))?;
390 let refreshed = brain.record_observation(observation);
391 if refreshed {
392 debug!("IndexDispatcher: refreshed cost-model weights");
393 }
394 }
395
396 self.maybe_persist_stats()?;
398
399 Ok(DispatchedSearch {
400 results,
401 served_by: current,
402 plan,
403 fallback_attempts,
404 latency_us: elapsed_us,
405 })
406 }
407
408 pub fn flush_stats(&self) -> Result<()> {
410 if let Some(path) = &self.config.stats_path {
411 let brain = self
412 .brain
413 .read()
414 .map_err(|_| anyhow!("dispatcher brain RwLock poisoned"))?;
415 brain.stats().save(path)?;
416 }
417 Ok(())
418 }
419
420 pub fn observation_count(&self) -> Result<u64> {
422 let brain = self
423 .brain
424 .read()
425 .map_err(|_| anyhow!("dispatcher brain RwLock poisoned"))?;
426 Ok(brain.stats().total_observations)
427 }
428
429 fn execute_with_family(
430 &self,
431 family: IndexFamily,
432 query: &Vector,
433 k: usize,
434 ) -> Result<Vec<(String, f32)>> {
435 match family {
436 IndexFamily::Hnsw => self
437 .hnsw
438 .as_ref()
439 .map(|i| i.search_knn(query, k))
440 .unwrap_or_else(|| Err(anyhow!("HNSW family not built"))),
441 IndexFamily::Ivf => self
442 .ivf
443 .as_ref()
444 .map(|i| i.search_knn(query, k))
445 .unwrap_or_else(|| Err(anyhow!("IVF family not built"))),
446 IndexFamily::Lsh => self
447 .lsh
448 .as_ref()
449 .map(|i| i.search_knn(query, k))
450 .unwrap_or_else(|| Err(anyhow!("LSH family not built"))),
451 IndexFamily::Pq => self
452 .pq
453 .as_ref()
454 .map(|i| i.search_knn(query, k))
455 .unwrap_or_else(|| Err(anyhow!("PQ family not built"))),
456 }
457 }
458
459 fn maybe_persist_stats(&self) -> Result<()> {
460 if self.config.stats_path.is_none() {
461 return Ok(());
462 }
463 let brain = self
466 .brain
467 .read()
468 .map_err(|_| anyhow!("dispatcher brain RwLock poisoned"))?;
469 let total = brain.stats().total_observations;
470 let interval = self.config.stats_save_interval.max(1);
471 if total % interval == 0 && total > 0 {
472 if let Some(path) = &self.config.stats_path {
473 if let Err(e) = brain.stats().save(path) {
474 warn!("IndexDispatcher: stats save to {:?} failed: {}", path, e);
475 }
476 }
477 }
478 Ok(())
479 }
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485 use std::env::temp_dir;
486
487 fn unique_stats_path() -> PathBuf {
488 let stamp = std::time::SystemTime::now()
489 .duration_since(std::time::UNIX_EPOCH)
490 .map(|d| d.as_nanos())
491 .unwrap_or(0);
492 let mut p = temp_dir();
493 p.push(format!("oxirs_vec_dispatcher_{}.json", stamp));
494 p
495 }
496
497 fn random_vec(seed: u64, dim: usize) -> Vector {
498 let mut state = seed.wrapping_mul(2654435769).wrapping_add(0x9E37_79B9);
499 let mut values = Vec::with_capacity(dim);
500 for _ in 0..dim {
501 state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
502 let f = (state as f32) / (u64::MAX as f32);
503 values.push((f - 0.5) * 2.0);
504 }
505 Vector::new(values)
506 }
507
508 fn small_test_config() -> IndexDispatcherConfig {
511 let mut cfg = IndexDispatcherConfig::default();
512 cfg.ivf_config.n_clusters = 4;
517 cfg.ivf_config.n_probes = 2;
518 cfg
519 }
520
521 #[test]
522 fn dispatcher_can_insert_and_search() -> Result<()> {
523 let mut d = IndexDispatcher::new(small_test_config())?;
524 for i in 0..50 {
525 d.insert(format!("v{}", i), random_vec(i as u64 + 1, 32))?;
526 }
527 d.build()?;
528 let q = random_vec(1, 32);
529 let results = d.search_knn(&q, 5)?;
530 assert!(!results.is_empty());
532 Ok(())
533 }
534
535 #[test]
536 fn search_with_plan_returns_metadata() -> Result<()> {
537 let mut d = IndexDispatcher::new(small_test_config())?;
538 for i in 0..50 {
539 d.insert(format!("v{}", i), random_vec(i as u64 + 1, 16))?;
540 }
541 d.build()?;
542 let q = random_vec(2, 16);
543 let dispatched = d.search_knn_with_plan(&q, 4, 1.0)?;
544 assert!(dispatched.latency_us >= 0.0);
545 assert_eq!(dispatched.plan.primary, dispatched.served_by);
547 Ok(())
548 }
549
550 #[test]
551 fn dispatcher_persists_stats_to_disk() -> Result<()> {
552 let path = unique_stats_path();
553 let mut config = small_test_config();
554 config.stats_path = Some(path.clone());
555 config.stats_save_interval = 1; let mut d = IndexDispatcher::new(config)?;
557 for i in 0..16 {
558 d.insert(format!("v{}", i), random_vec(i as u64 + 1, 8))?;
559 }
560 d.build()?;
561 let q = random_vec(99, 8);
562 let _ = d.search_knn(&q, 3)?;
563 assert!(path.exists(), "stats file must be created");
564 let loaded = QueryStats::load(&path)?;
565 assert!(loaded.total_observations >= 1);
566 let _ = std::fs::remove_file(&path);
567 Ok(())
568 }
569
570 #[test]
571 fn search_on_unbuilt_dispatcher_errors() -> Result<()> {
572 let d = IndexDispatcher::new(IndexDispatcherConfig::default())?;
573 let q = random_vec(1, 4);
574 let res = d.search_knn(&q, 3);
575 assert!(res.is_err());
576 Ok(())
577 }
578
579 #[test]
580 fn mismatched_dim_errors() -> Result<()> {
581 let mut d = IndexDispatcher::new(IndexDispatcherConfig::default())?;
582 d.insert("v1".into(), random_vec(1, 8))?;
583 let res = d.insert("v2".into(), random_vec(2, 16));
584 assert!(res.is_err(), "dim mismatch must error");
585 Ok(())
586 }
587
588 #[test]
589 fn flush_stats_no_op_without_path() -> Result<()> {
590 let d = IndexDispatcher::new(IndexDispatcherConfig::default())?;
591 d.flush_stats()?;
592 Ok(())
593 }
594
595 #[test]
596 fn flush_stats_writes_file_when_path_set() -> Result<()> {
597 let path = unique_stats_path();
598 let d = IndexDispatcher::with_stats_path(path.clone())?;
599 d.flush_stats()?;
600 assert!(path.exists(), "flush must create the file");
601 let _ = std::fs::remove_file(&path);
602 Ok(())
603 }
604
605 #[test]
606 fn restart_loads_previous_stats() -> Result<()> {
607 let path = unique_stats_path();
608 {
610 let mut config = small_test_config();
611 config.stats_path = Some(path.clone());
612 config.stats_save_interval = 1;
613 let mut d = IndexDispatcher::new(config)?;
614 for i in 0..16 {
615 d.insert(format!("v{}", i), random_vec(i as u64 + 1, 4))?;
616 }
617 d.build()?;
618 let q = random_vec(99, 4);
619 let _ = d.search_knn(&q, 2)?;
620 d.flush_stats()?;
621 }
622 let mut config = small_test_config();
624 config.stats_path = Some(path.clone());
625 let d2 = IndexDispatcher::new(config)?;
626 let n = d2.observation_count()?;
627 assert!(n >= 1, "second run must load at least 1 observation");
628 let _ = std::fs::remove_file(&path);
629 Ok(())
630 }
631
632 #[test]
633 fn build_is_idempotent() -> Result<()> {
634 let mut d = IndexDispatcher::new(small_test_config())?;
635 for i in 0..16 {
636 d.insert(format!("v{}", i), random_vec(i as u64 + 1, 8))?;
637 }
638 d.build()?;
639 d.build()?;
641 assert!(d.is_built());
642 Ok(())
643 }
644
645 #[test]
646 fn post_build_inserts_go_to_hnsw_and_lsh() -> Result<()> {
647 let mut d = IndexDispatcher::new(small_test_config())?;
648 for i in 0..16 {
649 d.insert(format!("v{}", i), random_vec(i as u64 + 1, 8))?;
650 }
651 d.build()?;
652 let pre = d.len();
653 d.insert("late".into(), random_vec(999, 8))?;
654 assert_eq!(d.len(), pre + 1);
655 Ok(())
656 }
657}