use anyhow::Result;
use parking_lot::RwLock;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct AdaptiveSearchConfig {
pub initial_ef_search: usize,
pub min_ef_search: usize,
pub max_ef_search: usize,
pub target_recall: f32,
pub target_latency_ms: f64,
pub recall_tolerance: f32,
pub latency_tolerance: f64,
pub adaptation_window: usize,
pub min_queries_for_adaptation: usize,
pub adaptation_rate: f32,
pub aggressive_mode: bool,
}
impl Default for AdaptiveSearchConfig {
fn default() -> Self {
Self {
initial_ef_search: 64,
min_ef_search: 16,
max_ef_search: 512,
target_recall: 0.95,
target_latency_ms: 10.0,
recall_tolerance: 0.02,
latency_tolerance: 0.2,
adaptation_window: 100,
min_queries_for_adaptation: 20,
adaptation_rate: 0.1,
aggressive_mode: false,
}
}
}
#[derive(Debug, Clone)]
pub struct QueryMetrics {
pub latency_ms: f64,
pub recall: Option<f32>,
pub distance_computations: usize,
pub ef_search_used: usize,
pub timestamp: Instant,
}
#[derive(Debug, Clone)]
pub struct AdaptiveSearchStats {
pub total_queries: usize,
pub current_ef_search: usize,
pub avg_latency_ms: f64,
pub avg_recall: Option<f32>,
pub adaptation_count: usize,
pub last_adaptation: Option<Instant>,
pub performance_score: f32,
}
pub struct AdaptiveSearchTuner {
config: AdaptiveSearchConfig,
current_ef_search: Arc<RwLock<usize>>,
query_history: Arc<RwLock<VecDeque<QueryMetrics>>>,
stats: Arc<RwLock<AdaptiveSearchStats>>,
}
impl AdaptiveSearchTuner {
pub fn new(config: AdaptiveSearchConfig) -> Self {
let initial_ef = config.initial_ef_search;
Self {
config,
current_ef_search: Arc::new(RwLock::new(initial_ef)),
query_history: Arc::new(RwLock::new(VecDeque::new())),
stats: Arc::new(RwLock::new(AdaptiveSearchStats {
total_queries: 0,
current_ef_search: initial_ef,
avg_latency_ms: 0.0,
avg_recall: None,
adaptation_count: 0,
last_adaptation: None,
performance_score: 0.5,
})),
}
}
pub fn get_ef_search(&self) -> usize {
*self.current_ef_search.read()
}
pub fn record_query(&self, metrics: QueryMetrics) -> Result<()> {
let mut history = self.query_history.write();
let mut stats = self.stats.write();
history.push_back(metrics.clone());
stats.total_queries += 1;
while history.len() > self.config.adaptation_window {
history.pop_front();
}
self.update_statistics(&mut stats, &history);
if history.len() >= self.config.min_queries_for_adaptation {
self.adapt_ef_search_internal(&mut stats, &history)?;
}
Ok(())
}
fn update_statistics(&self, stats: &mut AdaptiveSearchStats, history: &VecDeque<QueryMetrics>) {
if history.is_empty() {
return;
}
let sum_latency: f64 = history.iter().map(|m| m.latency_ms).sum();
stats.avg_latency_ms = sum_latency / history.len() as f64;
let recalls: Vec<f32> = history.iter().filter_map(|m| m.recall).collect();
if !recalls.is_empty() {
let sum_recall: f32 = recalls.iter().sum();
stats.avg_recall = Some(sum_recall / recalls.len() as f32);
}
let recall_score = stats.avg_recall.unwrap_or(0.8); let latency_ratio = self.config.target_latency_ms / stats.avg_latency_ms.max(0.001);
let latency_score = latency_ratio.min(1.0);
stats.performance_score = (0.7 * recall_score + 0.3 * latency_score as f32).min(1.0);
stats.current_ef_search = *self.current_ef_search.read();
}
fn adapt_ef_search_internal(
&self,
stats: &mut AdaptiveSearchStats,
_history: &VecDeque<QueryMetrics>,
) -> Result<()> {
let mut current_ef = self.current_ef_search.write();
let avg_latency = stats.avg_latency_ms;
let avg_recall = stats.avg_recall;
let recall_too_low = avg_recall
.is_some_and(|r| r < self.config.target_recall - self.config.recall_tolerance);
let recall_sufficient = match avg_recall {
Some(r) => r >= self.config.target_recall,
None => true,
};
let latency_too_high =
avg_latency > self.config.target_latency_ms * (1.0 + self.config.latency_tolerance);
let latency_acceptable = avg_latency <= self.config.target_latency_ms;
let new_ef = if recall_too_low {
let increase = if self.config.aggressive_mode {
(*current_ef as f32 * (1.0 + 2.0 * self.config.adaptation_rate)) as usize
} else {
(*current_ef as f32 * (1.0 + self.config.adaptation_rate)) as usize
};
increase.min(self.config.max_ef_search)
} else if recall_sufficient && latency_too_high {
let decrease = if self.config.aggressive_mode {
(*current_ef as f32 * (1.0 - 2.0 * self.config.adaptation_rate)) as usize
} else {
(*current_ef as f32 * (1.0 - self.config.adaptation_rate)) as usize
};
decrease.max(self.config.min_ef_search)
} else if recall_sufficient && latency_acceptable {
let decrease =
(*current_ef as f32 * (1.0 - 0.5 * self.config.adaptation_rate)) as usize;
decrease.max(self.config.min_ef_search)
} else {
*current_ef
};
if new_ef != *current_ef {
tracing::debug!(
"Adapting ef_search: {} -> {} (recall: {:?}, latency: {:.2}ms)",
*current_ef,
new_ef,
avg_recall,
avg_latency
);
*current_ef = new_ef;
stats.current_ef_search = new_ef;
stats.adaptation_count += 1;
stats.last_adaptation = Some(Instant::now());
}
Ok(())
}
pub fn stats(&self) -> AdaptiveSearchStats {
self.stats.read().clone()
}
pub fn reset(&self) {
let mut ef_search = self.current_ef_search.write();
*ef_search = self.config.initial_ef_search;
let mut history = self.query_history.write();
history.clear();
let mut stats = self.stats.write();
*stats = AdaptiveSearchStats {
total_queries: 0,
current_ef_search: self.config.initial_ef_search,
avg_latency_ms: 0.0,
avg_recall: None,
adaptation_count: 0,
last_adaptation: None,
performance_score: 0.5,
};
}
pub fn set_ef_search(&self, ef_search: usize) {
let mut current_ef = self.current_ef_search.write();
*current_ef = ef_search.clamp(self.config.min_ef_search, self.config.max_ef_search);
let mut stats = self.stats.write();
stats.current_ef_search = *current_ef;
}
pub fn config(&self) -> &AdaptiveSearchConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_adaptive_search_creation() {
let config = AdaptiveSearchConfig::default();
let tuner = AdaptiveSearchTuner::new(config);
assert_eq!(tuner.get_ef_search(), 64);
}
#[test]
fn test_record_query() {
let tuner = AdaptiveSearchTuner::new(AdaptiveSearchConfig::default());
let metrics = QueryMetrics {
latency_ms: 5.0,
recall: Some(0.95),
distance_computations: 100,
ef_search_used: 64,
timestamp: Instant::now(),
};
assert!(tuner.record_query(metrics).is_ok());
let stats = tuner.stats();
assert_eq!(stats.total_queries, 1);
}
#[test]
fn test_adaptation_on_low_recall() -> Result<()> {
let config = AdaptiveSearchConfig {
min_queries_for_adaptation: 5,
target_recall: 0.95,
initial_ef_search: 32,
..Default::default()
};
let tuner = AdaptiveSearchTuner::new(config);
for _ in 0..10 {
let metrics = QueryMetrics {
latency_ms: 2.0,
recall: Some(0.80), distance_computations: 50,
ef_search_used: 32,
timestamp: Instant::now(),
};
tuner.record_query(metrics)?;
}
assert!(tuner.get_ef_search() > 32);
Ok(())
}
#[test]
fn test_adaptation_on_high_latency() -> Result<()> {
let config = AdaptiveSearchConfig {
min_queries_for_adaptation: 5,
target_latency_ms: 5.0,
target_recall: 0.90,
initial_ef_search: 128,
..Default::default()
};
let tuner = AdaptiveSearchTuner::new(config);
for _ in 0..10 {
let metrics = QueryMetrics {
latency_ms: 15.0, recall: Some(0.98), distance_computations: 200,
ef_search_used: 128,
timestamp: Instant::now(),
};
tuner.record_query(metrics)?;
}
assert!(tuner.get_ef_search() < 128);
Ok(())
}
#[test]
fn test_manual_override() {
let tuner = AdaptiveSearchTuner::new(AdaptiveSearchConfig::default());
tuner.set_ef_search(200);
assert_eq!(tuner.get_ef_search(), 200);
}
#[test]
fn test_reset() {
let config = AdaptiveSearchConfig::default();
let initial_ef = config.initial_ef_search;
let tuner = AdaptiveSearchTuner::new(config);
tuner.set_ef_search(200);
tuner.reset();
assert_eq!(tuner.get_ef_search(), initial_ef);
assert_eq!(tuner.stats().total_queries, 0);
}
}