use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use common::VectorId;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct OnlineUpdateStats {
pub total_inserts: u64,
pub total_deletes: u64,
pub total_updates: u64,
pub pending_cleanup: usize,
pub last_maintenance_ms: u64,
pub maintenance_count: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OnlineUpdateConfig {
pub max_pending_deletes: usize,
pub max_pending_inserts: usize,
pub enable_background_maintenance: bool,
pub maintenance_interval_ms: u64,
}
impl Default for OnlineUpdateConfig {
fn default() -> Self {
Self {
max_pending_deletes: 1000,
max_pending_inserts: 100,
enable_background_maintenance: true,
maintenance_interval_ms: 60_000, }
}
}
pub trait OnlineUpdatable: Send + Sync {
fn online_insert(&self, id: VectorId, vector: Vec<f32>) -> Result<(), String>;
fn online_insert_batch(&self, vectors: Vec<(VectorId, Vec<f32>)>) -> Result<usize, String>;
fn online_delete(&self, id: &VectorId) -> Result<bool, String>;
fn online_delete_batch(&self, ids: &[VectorId]) -> Result<usize, String>;
fn online_update(&self, id: VectorId, vector: Vec<f32>) -> Result<(), String> {
self.online_delete(&id)?;
self.online_insert(id, vector)
}
fn online_update_batch(&self, vectors: Vec<(VectorId, Vec<f32>)>) -> Result<usize, String> {
let ids: Vec<_> = vectors.iter().map(|(id, _)| id.clone()).collect();
self.online_delete_batch(&ids)?;
self.online_insert_batch(vectors)
}
fn contains(&self, id: &VectorId) -> bool;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn maintenance(&self) -> Result<MaintenanceResult, String>;
fn online_stats(&self) -> OnlineUpdateStats;
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct MaintenanceResult {
pub vectors_cleaned: usize,
pub memory_reclaimed_bytes: usize,
pub duration_ms: u64,
pub restructured: bool,
}
#[derive(Debug)]
pub struct UpdateBuffer {
config: OnlineUpdateConfig,
pending_inserts: RwLock<Vec<(VectorId, Vec<f32>)>>,
pending_deletes: RwLock<HashSet<VectorId>>,
stats: UpdateBufferStats,
}
#[derive(Debug, Default)]
struct UpdateBufferStats {
total_inserts: AtomicU64,
total_deletes: AtomicU64,
total_updates: AtomicU64,
pending_count: AtomicUsize,
}
impl UpdateBuffer {
pub fn new(config: OnlineUpdateConfig) -> Self {
Self {
config,
pending_inserts: RwLock::new(Vec::new()),
pending_deletes: RwLock::new(HashSet::new()),
stats: UpdateBufferStats::default(),
}
}
pub fn buffer_insert(&self, id: VectorId, vector: Vec<f32>) {
let mut inserts = self
.pending_inserts
.write()
.expect("pending_inserts lock poisoned in buffer_insert");
inserts.push((id, vector));
self.stats.pending_count.fetch_add(1, Ordering::Relaxed);
}
pub fn buffer_delete(&self, id: VectorId) {
let mut deletes = self
.pending_deletes
.write()
.expect("pending_deletes lock poisoned in buffer_delete");
deletes.insert(id);
}
pub fn should_flush(&self) -> bool {
let inserts = self
.pending_inserts
.read()
.expect("pending_inserts lock poisoned in should_flush");
let deletes = self
.pending_deletes
.read()
.expect("pending_deletes lock poisoned in should_flush");
inserts.len() >= self.config.max_pending_inserts
|| deletes.len() >= self.config.max_pending_deletes
}
pub fn flush_inserts(&self) -> Vec<(VectorId, Vec<f32>)> {
let mut inserts = self
.pending_inserts
.write()
.expect("pending_inserts lock poisoned in flush_inserts");
let flushed: Vec<_> = inserts.drain(..).collect();
self.stats
.total_inserts
.fetch_add(flushed.len() as u64, Ordering::Relaxed);
self.stats
.pending_count
.fetch_sub(flushed.len(), Ordering::Relaxed);
flushed
}
pub fn flush_deletes(&self) -> HashSet<VectorId> {
let mut deletes = self
.pending_deletes
.write()
.expect("pending_deletes lock poisoned in flush_deletes");
let flushed: HashSet<_> = deletes.drain().collect();
self.stats
.total_deletes
.fetch_add(flushed.len() as u64, Ordering::Relaxed);
flushed
}
pub fn stats(&self) -> OnlineUpdateStats {
OnlineUpdateStats {
total_inserts: self.stats.total_inserts.load(Ordering::Relaxed),
total_deletes: self.stats.total_deletes.load(Ordering::Relaxed),
total_updates: self.stats.total_updates.load(Ordering::Relaxed),
pending_cleanup: self
.pending_deletes
.read()
.expect("pending_deletes lock poisoned in stats")
.len(),
last_maintenance_ms: 0,
maintenance_count: 0,
}
}
}
pub struct OnlineIndex<T> {
inner: Arc<RwLock<T>>,
_buffer: UpdateBuffer,
deleted_ids: RwLock<HashSet<VectorId>>,
id_to_vector: RwLock<HashMap<VectorId, Vec<f32>>>,
stats: Arc<RwLock<OnlineUpdateStats>>,
config: OnlineUpdateConfig,
}
impl<T> OnlineIndex<T> {
pub fn new(inner: T, config: OnlineUpdateConfig) -> Self {
Self {
inner: Arc::new(RwLock::new(inner)),
_buffer: UpdateBuffer::new(config.clone()),
deleted_ids: RwLock::new(HashSet::new()),
id_to_vector: RwLock::new(HashMap::new()),
stats: Arc::new(RwLock::new(OnlineUpdateStats::default())),
config,
}
}
pub fn read(&self) -> std::sync::RwLockReadGuard<'_, T> {
self.inner
.read()
.expect("inner index lock poisoned in read")
}
pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, T> {
self.inner
.write()
.expect("inner index lock poisoned in write")
}
pub fn is_deleted(&self, id: &VectorId) -> bool {
self.deleted_ids
.read()
.expect("deleted_ids lock poisoned in is_deleted")
.contains(id)
}
pub fn mark_deleted(&self, id: VectorId) -> bool {
let mut deleted = self
.deleted_ids
.write()
.expect("deleted_ids lock poisoned in mark_deleted");
let was_new = deleted.insert(id.clone());
if was_new {
let mut stats = self
.stats
.write()
.expect("stats lock poisoned in mark_deleted");
stats.total_deletes += 1;
stats.pending_cleanup += 1;
}
was_new
}
pub fn clear_deleted(&self, ids: &[VectorId]) {
let mut deleted = self
.deleted_ids
.write()
.expect("deleted_ids lock poisoned in clear_deleted");
let mut stats = self
.stats
.write()
.expect("stats lock poisoned in clear_deleted");
for id in ids {
if deleted.remove(id) {
stats.pending_cleanup = stats.pending_cleanup.saturating_sub(1);
}
}
}
pub fn pending_deletes(&self) -> usize {
self.deleted_ids
.read()
.expect("deleted_ids lock poisoned in pending_deletes")
.len()
}
pub fn needs_maintenance(&self) -> bool {
self.pending_deletes() >= self.config.max_pending_deletes
}
pub fn record_insert(&self) {
let mut stats = self
.stats
.write()
.expect("stats lock poisoned in record_insert");
stats.total_inserts += 1;
}
pub fn record_update(&self) {
let mut stats = self
.stats
.write()
.expect("stats lock poisoned in record_update");
stats.total_updates += 1;
}
pub fn stats(&self) -> OnlineUpdateStats {
self.stats
.read()
.expect("stats lock poisoned in stats")
.clone()
}
pub fn store_vector(&self, id: VectorId, vector: Vec<f32>) {
let mut map = self
.id_to_vector
.write()
.expect("id_to_vector lock poisoned in store_vector");
map.insert(id, vector);
}
pub fn remove_vector(&self, id: &VectorId) -> Option<Vec<f32>> {
let mut map = self
.id_to_vector
.write()
.expect("id_to_vector lock poisoned in remove_vector");
map.remove(id)
}
pub fn has_vector(&self, id: &VectorId) -> bool {
let map = self
.id_to_vector
.read()
.expect("id_to_vector lock poisoned in has_vector");
map.contains_key(id) && !self.is_deleted(id)
}
pub fn vector_count(&self) -> usize {
let map = self
.id_to_vector
.read()
.expect("id_to_vector lock poisoned in vector_count");
let deleted = self
.deleted_ids
.read()
.expect("deleted_ids lock poisoned in vector_count");
map.len().saturating_sub(deleted.len())
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DeltaLog {
pub inserts: Vec<(VectorId, Vec<f32>)>,
pub deletes: Vec<VectorId>,
pub base_snapshot_time: u64,
pub sequence_number: u64,
}
impl DeltaLog {
pub fn new(base_snapshot_time: u64) -> Self {
Self {
inserts: Vec::new(),
deletes: Vec::new(),
base_snapshot_time,
sequence_number: 0,
}
}
pub fn record_insert(&mut self, id: VectorId, vector: Vec<f32>) {
self.inserts.push((id, vector));
self.sequence_number += 1;
}
pub fn record_delete(&mut self, id: VectorId) {
self.deletes.push(id);
self.sequence_number += 1;
}
pub fn should_compact(&self, threshold: usize) -> bool {
self.inserts.len() + self.deletes.len() > threshold
}
pub fn clear(&mut self, new_base_time: u64) {
self.inserts.clear();
self.deletes.clear();
self.base_snapshot_time = new_base_time;
}
pub fn len(&self) -> usize {
self.inserts.len() + self.deletes.len()
}
pub fn is_empty(&self) -> bool {
self.inserts.is_empty() && self.deletes.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_update_buffer() {
let config = OnlineUpdateConfig::default();
let buffer = UpdateBuffer::new(config);
buffer.buffer_insert("v1".to_string(), vec![1.0, 2.0, 3.0]);
buffer.buffer_insert("v2".to_string(), vec![4.0, 5.0, 6.0]);
assert!(!buffer.should_flush());
buffer.buffer_delete("v1".to_string());
let inserts = buffer.flush_inserts();
assert_eq!(inserts.len(), 2);
let deletes = buffer.flush_deletes();
assert_eq!(deletes.len(), 1);
assert!(deletes.contains("v1"));
}
#[test]
fn test_delta_log() {
let mut log = DeltaLog::new(1000);
log.record_insert("v1".to_string(), vec![1.0, 2.0]);
log.record_insert("v2".to_string(), vec![3.0, 4.0]);
log.record_delete("v0".to_string());
assert_eq!(log.len(), 3);
assert_eq!(log.inserts.len(), 2);
assert_eq!(log.deletes.len(), 1);
assert_eq!(log.sequence_number, 3);
log.clear(2000);
assert!(log.is_empty());
assert_eq!(log.base_snapshot_time, 2000);
assert_eq!(log.sequence_number, 3); }
#[test]
fn test_online_index_wrapper() {
let inner: Vec<(String, Vec<f32>)> = Vec::new();
let config = OnlineUpdateConfig::default();
let online = OnlineIndex::new(inner, config);
online.store_vector("v1".to_string(), vec![1.0, 2.0]);
online.store_vector("v2".to_string(), vec![3.0, 4.0]);
assert!(online.has_vector(&"v1".to_string()));
assert!(online.has_vector(&"v2".to_string()));
assert_eq!(online.vector_count(), 2);
online.mark_deleted("v1".to_string());
assert!(!online.has_vector(&"v1".to_string()));
assert_eq!(online.vector_count(), 1);
assert_eq!(online.pending_deletes(), 1);
let stats = online.stats();
assert_eq!(stats.total_deletes, 1);
}
#[test]
fn test_flush_threshold() {
let config = OnlineUpdateConfig {
max_pending_inserts: 5,
max_pending_deletes: 3,
..Default::default()
};
let buffer = UpdateBuffer::new(config);
for i in 0..4 {
buffer.buffer_insert(format!("v{}", i), vec![i as f32]);
}
assert!(!buffer.should_flush());
buffer.buffer_insert("v4".to_string(), vec![4.0]);
assert!(buffer.should_flush());
}
}