use super::delta::DeltaBuffer;
use crate::distance::DistanceMetric;
use parking_lot::{Mutex, RwLock};
use rustc_hash::FxHashSet;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
const DEFAULT_MERGE_THRESHOLD: usize = 1024;
const DEFAULT_MAX_BUFFER_AGE_MS: u64 = 5000;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeferredIndexerConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_merge_threshold")]
pub merge_threshold: usize,
#[serde(default = "default_max_buffer_age_ms")]
pub max_buffer_age_ms: u64,
}
fn default_merge_threshold() -> usize {
DEFAULT_MERGE_THRESHOLD
}
fn default_max_buffer_age_ms() -> u64 {
DEFAULT_MAX_BUFFER_AGE_MS
}
impl Default for DeferredIndexerConfig {
fn default() -> Self {
Self {
enabled: false,
merge_threshold: DEFAULT_MERGE_THRESHOLD,
max_buffer_age_ms: DEFAULT_MAX_BUFFER_AGE_MS,
}
}
}
pub struct DeferredIndexer {
buffer: Arc<DeltaBuffer>,
swap_lock: Mutex<()>,
deleted_ids: RwLock<FxHashSet<u64>>,
config: DeferredIndexerConfig,
}
impl DeferredIndexer {
#[must_use]
pub fn new(config: DeferredIndexerConfig) -> Self {
Self {
buffer: Arc::new(DeltaBuffer::new()),
swap_lock: Mutex::new(()),
deleted_ids: RwLock::new(FxHashSet::default()),
config,
}
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
pub fn push(&self, id: u64, vector: Vec<f32>) -> bool {
if !self.config.enabled {
return false;
}
self.ensure_buffer_active();
self.buffer.push(id, vector);
self.buffer.len() >= self.config.merge_threshold
}
pub fn extend(&self, entries: impl IntoIterator<Item = (u64, Vec<f32>)>) -> bool {
if !self.config.enabled {
return false;
}
self.ensure_buffer_active();
self.buffer.extend(entries);
self.buffer.len() >= self.config.merge_threshold
}
pub fn remove(&self, id: u64) {
self.buffer.remove(id);
self.deleted_ids.write().insert(id);
}
#[must_use]
pub fn search(&self, query: &[f32], k: usize, metric: DistanceMetric) -> Vec<(u64, f32)> {
let deleted = self.deleted_ids.read();
let overfetch = k.saturating_add(deleted.len());
let buffer_results = self.buffer.search(query, overfetch, metric);
let mut filtered = filter_deleted(buffer_results, &deleted);
drop(deleted);
metric.sort_results(&mut filtered);
filtered.truncate(k);
filtered
}
#[must_use]
pub fn merge_with_hnsw(
&self,
hnsw_results: Vec<(u64, f32)>,
query: &[f32],
k: usize,
metric: DistanceMetric,
) -> Vec<(u64, f32)> {
let buffer_results = self.search(query, k, metric);
if buffer_results.is_empty() {
return hnsw_results;
}
let buffer_ids: FxHashSet<u64> = buffer_results.iter().map(|(id, _)| *id).collect();
let mut combined: Vec<(u64, f32)> = hnsw_results
.into_iter()
.filter(|(id, _)| !buffer_ids.contains(id))
.collect();
combined.extend(buffer_results);
metric.sort_results(&mut combined);
combined.truncate(k);
combined
}
pub fn swap_and_drain(&self) -> Vec<(u64, Vec<f32>)> {
let _guard = self.swap_lock.lock();
let drained = self.buffer.deactivate_and_drain();
self.deleted_ids.write().clear();
self.buffer.activate();
drained
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.buffer.len()
}
#[must_use]
pub fn should_merge(&self) -> bool {
self.buffer.len() >= self.config.merge_threshold
}
#[must_use]
pub fn is_searchable(&self) -> bool {
self.config.enabled && self.buffer.is_searchable()
}
pub fn drain_all(&self) -> Vec<(u64, Vec<f32>)> {
let _guard = self.swap_lock.lock();
let all = self.buffer.deactivate_and_drain();
self.deleted_ids.write().clear();
all
}
fn ensure_buffer_active(&self) {
if !self.buffer.is_active() {
self.buffer.activate();
}
}
}
fn filter_deleted(results: Vec<(u64, f32)>, deleted: &FxHashSet<u64>) -> Vec<(u64, f32)> {
if deleted.is_empty() {
return results;
}
results
.into_iter()
.filter(|(id, _)| !deleted.contains(id))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
fn enabled_config(threshold: usize) -> DeferredIndexerConfig {
DeferredIndexerConfig {
enabled: true,
merge_threshold: threshold,
..DeferredIndexerConfig::default()
}
}
#[test]
fn test_deferred_push_when_enabled() {
let idx = DeferredIndexer::new(enabled_config(1024));
idx.push(1, vec![1.0, 0.0, 0.0]);
idx.push(2, vec![0.0, 1.0, 0.0]);
assert_eq!(idx.pending_count(), 2);
}
#[test]
fn test_deferred_push_returns_true_at_threshold() {
let idx = DeferredIndexer::new(enabled_config(3));
assert!(!idx.push(1, vec![1.0]));
assert!(!idx.push(2, vec![2.0]));
assert!(idx.push(3, vec![3.0]), "third push should hit threshold");
}
#[test]
fn test_deferred_push_noop_when_disabled() {
let config = DeferredIndexerConfig::default(); let idx = DeferredIndexer::new(config);
let triggered = idx.push(1, vec![1.0, 2.0]);
assert!(!triggered);
assert_eq!(idx.pending_count(), 0);
}
#[test]
fn test_deferred_extend_returns_true_at_threshold() {
let idx = DeferredIndexer::new(enabled_config(3));
let entries = vec![(1, vec![1.0]), (2, vec![2.0]), (3, vec![3.0])];
assert!(idx.extend(entries), "batch should hit threshold");
}
#[test]
fn test_deferred_search_finds_buffered_vectors() {
let idx = DeferredIndexer::new(enabled_config(1024));
idx.push(1, vec![1.0, 0.0]);
idx.push(2, vec![0.0, 1.0]);
let results = idx.search(&[1.0, 0.0], 2, DistanceMetric::Cosine);
assert_eq!(results.len(), 2);
assert_eq!(results[0].0, 1);
}
#[test]
fn test_deferred_search_filters_deleted_ids() {
let idx = DeferredIndexer::new(enabled_config(1024));
idx.push(1, vec![1.0, 0.0, 0.0]);
idx.push(2, vec![0.0, 1.0, 0.0]);
idx.push(3, vec![0.0, 0.0, 1.0]);
idx.remove(2);
let results = idx.search(&[1.0, 0.0, 0.0], 10, DistanceMetric::Euclidean);
let ids: Vec<u64> = results.iter().map(|(id, _)| *id).collect();
assert!(!ids.contains(&2), "deleted ID 2 must not appear in results");
assert_eq!(ids.len(), 2);
}
#[test]
fn test_deferred_swap_and_drain() {
let idx = DeferredIndexer::new(enabled_config(1024));
idx.push(1, vec![1.0]);
idx.push(2, vec![2.0]);
let drained = idx.swap_and_drain();
assert_eq!(drained.len(), 2);
assert_eq!(idx.pending_count(), 0, "buffer should be empty after drain");
}
#[test]
fn test_deferred_swap_and_drain_clears_deleted_ids() {
let idx = DeferredIndexer::new(enabled_config(1024));
idx.push(1, vec![1.0]);
idx.remove(1);
let _drained = idx.swap_and_drain();
assert!(idx.deleted_ids.read().is_empty());
}
#[test]
fn test_deferred_swap_and_drain_reactivates_buffer() {
let idx = DeferredIndexer::new(enabled_config(1024));
idx.push(1, vec![1.0]);
let _ = idx.swap_and_drain();
idx.push(2, vec![2.0]);
assert_eq!(idx.pending_count(), 1, "push after drain must succeed");
assert!(
idx.is_searchable(),
"buffer should be searchable after push"
);
}
#[test]
fn test_deferred_drain_all_leaves_buffer_inactive() {
let idx = DeferredIndexer::new(enabled_config(1024));
idx.push(1, vec![1.0]);
let _ = idx.drain_all();
assert!(
!idx.is_searchable(),
"buffer must not be searchable immediately after drain_all"
);
assert_eq!(
idx.pending_count(),
0,
"buffer should be empty after drain_all"
);
}
#[test]
fn test_deferred_merge_with_hnsw() {
let idx = DeferredIndexer::new(enabled_config(1024));
idx.push(10, vec![0.9, 0.1]);
idx.push(30, vec![0.5, 0.5]);
let hnsw = vec![(10, 0.95_f32), (20, 0.80_f32)];
let merged = idx.merge_with_hnsw(hnsw, &[1.0, 0.0], 3, DistanceMetric::Cosine);
let ids: Vec<u64> = merged.iter().map(|(id, _)| *id).collect();
let unique: HashSet<u64> = ids.iter().copied().collect();
assert_eq!(ids.len(), unique.len(), "no duplicate IDs");
assert_eq!(merged.len(), 3);
assert!(ids.contains(&10));
assert!(ids.contains(&20));
assert!(ids.contains(&30));
let id10_score = merged.iter().find(|(id, _)| *id == 10).map(|(_, s)| *s);
assert!(
(id10_score.unwrap_or(0.0) - 0.95).abs() > f32::EPSILON,
"buffer score should be authoritative for id=10, not HNSW"
);
}
#[test]
fn test_deferred_merge_with_hnsw_empty_buffer() {
let idx = DeferredIndexer::new(enabled_config(1024));
let hnsw = vec![(1, 0.9_f32), (2, 0.8_f32)];
let merged = idx.merge_with_hnsw(hnsw.clone(), &[1.0, 0.0], 5, DistanceMetric::Cosine);
assert_eq!(merged, hnsw);
}
#[test]
fn test_deferred_drain_all() {
let idx = DeferredIndexer::new(enabled_config(1024));
idx.push(1, vec![1.0]);
idx.push(2, vec![2.0]);
let all = idx.drain_all();
assert_eq!(all.len(), 2);
assert_eq!(idx.pending_count(), 0);
assert!(!idx.is_searchable(), "not searchable after drain_all");
}
#[test]
fn test_deferred_config_serde() {
let config = DeferredIndexerConfig {
enabled: true,
merge_threshold: 512,
max_buffer_age_ms: 3000,
};
let json = serde_json::to_string(&config).expect("serialize");
let restored: DeferredIndexerConfig = serde_json::from_str(&json).expect("deserialize");
assert!(restored.enabled);
assert_eq!(restored.merge_threshold, 512);
assert_eq!(restored.max_buffer_age_ms, 3000);
}
#[test]
fn test_deferred_config_serde_defaults() {
let json = "{}";
let config: DeferredIndexerConfig = serde_json::from_str(json).expect("deserialize empty");
assert!(!config.enabled);
assert_eq!(config.merge_threshold, DEFAULT_MERGE_THRESHOLD);
assert_eq!(config.max_buffer_age_ms, DEFAULT_MAX_BUFFER_AGE_MS);
}
#[test]
fn test_deferred_should_merge_reflects_threshold() {
let idx = DeferredIndexer::new(enabled_config(2));
assert!(!idx.should_merge());
idx.push(1, vec![1.0]);
assert!(!idx.should_merge());
idx.push(2, vec![2.0]);
assert!(idx.should_merge());
}
#[test]
fn test_deferred_is_enabled_reflects_config() {
let enabled = DeferredIndexer::new(enabled_config(1024));
assert!(enabled.is_enabled());
let disabled = DeferredIndexer::new(DeferredIndexerConfig::default());
assert!(!disabled.is_enabled());
}
}