use crate::distance::DistanceMetric;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::atomic::{AtomicU8, Ordering};
const INACTIVE: u8 = 0;
const ACTIVE: u8 = 1;
const DRAINING: u8 = 2;
pub struct DeltaBuffer {
points: RwLock<Vec<(u64, Vec<f32>)>>,
state: AtomicU8,
}
impl DeltaBuffer {
#[must_use]
pub fn new() -> Self {
Self {
points: RwLock::new(Vec::new()),
state: AtomicU8::new(INACTIVE),
}
}
#[must_use]
pub fn is_active(&self) -> bool {
self.state.load(Ordering::Acquire) == ACTIVE
}
#[must_use]
pub fn is_searchable(&self) -> bool {
let s = self.state.load(Ordering::Acquire);
s == ACTIVE || s == DRAINING
}
pub fn activate(&self) {
self.state.store(ACTIVE, Ordering::Release);
}
pub fn deactivate_and_drain(&self) -> Vec<(u64, Vec<f32>)> {
self.state.store(DRAINING, Ordering::Release);
let mut points = self.points.write();
let drained = std::mem::take(&mut *points);
self.state.store(INACTIVE, Ordering::Release);
drop(points);
drained
}
pub fn push(&self, id: u64, vector: Vec<f32>) {
let mut points = self.points.write();
if self.state.load(Ordering::Acquire) == ACTIVE {
points.push((id, vector));
}
}
pub fn extend(&self, entries: impl IntoIterator<Item = (u64, Vec<f32>)>) {
let mut points = self.points.write();
if self.state.load(Ordering::Acquire) == ACTIVE {
points.extend(entries);
}
}
#[must_use]
pub fn len(&self) -> usize {
self.points.read().len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
pub fn stats(&self) -> (usize, bool) {
let len = self.points.read().len();
(len, len == 0)
}
#[must_use]
pub fn search(&self, query: &[f32], k: usize, metric: DistanceMetric) -> Vec<(u64, f32)> {
let current_state = self.state.load(Ordering::Acquire);
if current_state != ACTIVE && current_state != DRAINING {
return Vec::new();
}
let snapshot: Vec<(u64, Vec<f32>)> = self.points.read().clone();
if snapshot.is_empty() {
return Vec::new();
}
let mut results: Vec<(u64, f32)> = snapshot
.iter()
.map(|(id, vec)| (*id, metric.calculate(query, vec)))
.collect();
metric.sort_results(&mut results);
results.truncate(k);
results
}
}
impl Default for DeltaBuffer {
fn default() -> Self {
Self::new()
}
}
#[must_use]
pub fn merge_with_delta(
hnsw_results: Vec<(u64, f32)>,
delta: &DeltaBuffer,
query: &[f32],
k: usize,
metric: DistanceMetric,
) -> Vec<(u64, f32)> {
if !delta.is_searchable() {
return hnsw_results;
}
let delta_results = delta.search(query, k, metric);
if delta_results.is_empty() {
return hnsw_results;
}
let delta_ids: HashSet<u64> = delta_results.iter().map(|(id, _)| *id).collect();
let mut merged: Vec<(u64, f32)> = hnsw_results
.into_iter()
.filter(|(id, _)| !delta_ids.contains(id))
.collect();
merged.extend(delta_results);
metric.sort_results(&mut merged);
merged.truncate(k);
merged
}
#[must_use]
pub fn merge_with_delta_scored(
hnsw_results: Vec<crate::scored_result::ScoredResult>,
delta: &DeltaBuffer,
query: &[f32],
k: usize,
metric: DistanceMetric,
) -> Vec<crate::scored_result::ScoredResult> {
if !delta.is_searchable() {
return hnsw_results;
}
let delta_results = delta.search(query, k, metric);
if delta_results.is_empty() {
return hnsw_results;
}
let delta_ids: HashSet<u64> = delta_results.iter().map(|(id, _)| *id).collect();
let mut merged: Vec<(u64, f32)> = hnsw_results
.into_iter()
.filter(|sr| !delta_ids.contains(&sr.id))
.map(Into::into)
.collect();
merged.extend(delta_results);
metric.sort_results(&mut merged);
merged.truncate(k);
merged
.into_iter()
.map(crate::scored_result::ScoredResult::from)
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stream_delta_buffer_compiles_and_defaults_inactive() {
let buf = DeltaBuffer::new();
assert!(
!buf.is_active(),
"new DeltaBuffer should be inactive by default"
);
}
#[test]
fn test_stream_delta_buffer_default_trait() {
let buf = DeltaBuffer::default();
assert!(!buf.is_active());
}
#[test]
fn test_stream_delta_push_and_search() {
let buf = DeltaBuffer::new();
buf.activate();
buf.push(1, vec![1.0, 0.0, 0.0]);
buf.push(2, vec![0.0, 1.0, 0.0]);
buf.push(3, vec![0.5, 0.5, 0.0]);
let query = &[1.0, 0.0, 0.0];
let results = buf.search(query, 2, DistanceMetric::Cosine);
assert_eq!(results.len(), 2, "should return at most k=2 results");
assert_eq!(
results[0].0, 1,
"closest match should be id=1 (identical vector)"
);
}
#[test]
fn test_stream_delta_search_returns_empty_when_inactive() {
let buf = DeltaBuffer::new();
buf.push(1, vec![1.0, 0.0, 0.0]);
let results = buf.search(&[1.0, 0.0, 0.0], 10, DistanceMetric::Cosine);
assert!(
results.is_empty(),
"inactive delta should return no results"
);
}
#[test]
fn test_stream_delta_push_noop_when_inactive() {
let buf = DeltaBuffer::new();
buf.push(1, vec![1.0, 0.0]);
buf.extend(vec![(2, vec![0.0, 1.0])]);
assert_eq!(buf.len(), 0, "push/extend should be no-ops when inactive");
}
#[test]
fn test_stream_delta_search_cosine_ordering() {
let buf = DeltaBuffer::new();
buf.activate();
buf.push(10, vec![1.0, 0.0]);
buf.push(20, vec![0.0, 1.0]);
buf.push(30, vec![1.0, 1.0]);
let query = &[1.0, 0.0];
let results = buf.search(query, 3, DistanceMetric::Cosine);
assert_eq!(results[0].0, 10);
assert_eq!(results[1].0, 30);
assert_eq!(results[2].0, 20);
}
#[test]
fn test_stream_delta_search_euclidean_ordering() {
let buf = DeltaBuffer::new();
buf.activate();
buf.push(1, vec![0.0, 0.0]);
buf.push(2, vec![1.0, 0.0]);
buf.push(3, vec![3.0, 4.0]);
let query = &[0.0, 0.0];
let results = buf.search(query, 3, DistanceMetric::Euclidean);
assert_eq!(results[0].0, 1);
assert_eq!(results[1].0, 2);
assert_eq!(results[2].0, 3);
}
#[test]
fn test_stream_delta_merge_with_delta_inactive() {
let buf = DeltaBuffer::new();
let hnsw = vec![(1, 0.9), (2, 0.8)];
let merged = merge_with_delta(hnsw.clone(), &buf, &[1.0, 0.0], 5, DistanceMetric::Cosine);
assert_eq!(merged, hnsw, "inactive delta should return HNSW unchanged");
}
#[test]
fn test_stream_delta_merge_dedup_and_truncate() {
let buf = DeltaBuffer::new();
buf.activate();
buf.push(1, vec![0.9, 0.1]);
buf.push(3, vec![0.8, 0.2]);
let hnsw = vec![(1, 0.95), (2, 0.80)];
let query = &[1.0, 0.0];
let merged = merge_with_delta(hnsw, &buf, query, 2, DistanceMetric::Cosine);
assert_eq!(merged.len(), 2);
let ids: Vec<u64> = merged.iter().map(|(id, _)| *id).collect();
let unique: HashSet<u64> = ids.iter().copied().collect();
assert_eq!(
ids.len(),
unique.len(),
"no duplicate IDs in merged results"
);
}
#[test]
fn test_stream_delta_merge_empty_delta() {
let buf = DeltaBuffer::new();
buf.activate();
let hnsw = vec![(1, 0.9), (2, 0.8)];
let merged = merge_with_delta(hnsw.clone(), &buf, &[1.0, 0.0], 5, DistanceMetric::Cosine);
assert_eq!(
merged, hnsw,
"empty active delta should return HNSW unchanged"
);
}
#[test]
fn test_stream_delta_activate_deactivate_drain() {
let buf = DeltaBuffer::new();
assert!(!buf.is_active());
buf.activate();
assert!(buf.is_active());
buf.push(1, vec![1.0]);
buf.push(2, vec![2.0]);
assert_eq!(buf.len(), 2);
let drained = buf.deactivate_and_drain();
assert!(!buf.is_active());
assert!(buf.is_empty());
assert_eq!(drained.len(), 2);
assert_eq!(drained[0].0, 1);
assert_eq!(drained[1].0, 2);
}
#[test]
fn test_stream_delta_extend() {
let buf = DeltaBuffer::new();
buf.activate();
buf.extend(vec![(1, vec![1.0]), (2, vec![2.0]), (3, vec![3.0])]);
assert_eq!(buf.len(), 3);
}
#[test]
fn test_stream_delta_stats() {
let buf = DeltaBuffer::new();
buf.activate();
buf.push(1, vec![1.0]);
let (len, is_empty) = buf.stats();
assert_eq!(len, 1);
assert!(!is_empty);
}
}