use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ChangeKind {
Insert,
Update,
Delete,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeRecord {
pub seq: u64,
pub key: String,
pub kind: ChangeKind,
pub vector: Option<Vec<f32>>,
pub metadata: HashMap<String, String>,
}
impl ChangeRecord {
fn insert(seq: u64, key: String, vector: Vec<f32>, metadata: HashMap<String, String>) -> Self {
Self {
seq,
key,
kind: ChangeKind::Insert,
vector: Some(vector),
metadata,
}
}
fn update(seq: u64, key: String, vector: Vec<f32>, metadata: HashMap<String, String>) -> Self {
Self {
seq,
key,
kind: ChangeKind::Update,
vector: Some(vector),
metadata,
}
}
fn delete(seq: u64, key: String) -> Self {
Self {
seq,
key,
kind: ChangeKind::Delete,
vector: None,
metadata: HashMap::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredEntry {
pub vector: Vec<f32>,
pub metadata: HashMap<String, String>,
pub version: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoreDelta {
pub source_seq: u64,
pub since_seq: u64,
pub changes: Vec<ChangeRecord>,
}
impl StoreDelta {
pub fn len(&self) -> usize {
self.changes.len()
}
pub fn is_empty(&self) -> bool {
self.changes.is_empty()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct MergeResult {
pub inserts_applied: usize,
pub updates_applied: usize,
pub deletes_applied: usize,
pub conflicts_skipped: usize,
}
impl MergeResult {
pub fn total_applied(&self) -> usize {
self.inserts_applied + self.updates_applied + self.deletes_applied
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaStoreStats {
pub entry_count: usize,
pub current_seq: u64,
pub log_length: usize,
pub total_inserts: u64,
pub total_updates: u64,
pub total_deletes: u64,
pub total_merges: u64,
}
pub struct DeltaSyncVectorStore {
entries: HashMap<String, StoredEntry>,
change_log: Vec<ChangeRecord>,
seq: u64,
total_inserts: u64,
total_updates: u64,
total_deletes: u64,
total_merges: u64,
}
impl Default for DeltaSyncVectorStore {
fn default() -> Self {
Self::new()
}
}
impl DeltaSyncVectorStore {
pub fn new() -> Self {
Self {
entries: HashMap::new(),
change_log: Vec::new(),
seq: 0,
total_inserts: 0,
total_updates: 0,
total_deletes: 0,
total_merges: 0,
}
}
pub fn insert(&mut self, key: String, vector: Vec<f32>) -> Result<u64> {
self.insert_with_metadata(key, vector, HashMap::new())
}
pub fn insert_with_metadata(
&mut self,
key: String,
vector: Vec<f32>,
metadata: HashMap<String, String>,
) -> Result<u64> {
if self.entries.contains_key(&key) {
return Err(anyhow!("Key '{}' already exists; use update()", key));
}
self.seq += 1;
let seq = self.seq;
let record = ChangeRecord::insert(seq, key.clone(), vector.clone(), metadata.clone());
self.change_log.push(record);
self.entries.insert(
key,
StoredEntry {
vector,
metadata,
version: seq,
},
);
self.total_inserts += 1;
Ok(seq)
}
pub fn update(&mut self, key: String, vector: Vec<f32>) -> Result<u64> {
self.update_with_metadata(key, vector, HashMap::new())
}
pub fn update_with_metadata(
&mut self,
key: String,
vector: Vec<f32>,
metadata: HashMap<String, String>,
) -> Result<u64> {
if !self.entries.contains_key(&key) {
return Err(anyhow!("Key '{}' does not exist; use insert()", key));
}
self.seq += 1;
let seq = self.seq;
let record = ChangeRecord::update(seq, key.clone(), vector.clone(), metadata.clone());
self.change_log.push(record);
if let Some(entry) = self.entries.get_mut(&key) {
entry.vector = vector;
entry.metadata = metadata;
entry.version = seq;
}
self.total_updates += 1;
Ok(seq)
}
pub fn upsert(&mut self, key: String, vector: Vec<f32>) -> Result<u64> {
if self.entries.contains_key(&key) {
self.update(key, vector)
} else {
self.insert(key, vector)
}
}
pub fn delete(&mut self, key: &str) -> Result<u64> {
if !self.entries.contains_key(key) {
return Err(anyhow!("Key '{}' not found", key));
}
self.seq += 1;
let seq = self.seq;
let record = ChangeRecord::delete(seq, key.to_string());
self.change_log.push(record);
self.entries.remove(key);
self.total_deletes += 1;
Ok(seq)
}
pub fn get(&self, key: &str) -> Option<&StoredEntry> {
self.entries.get(key)
}
pub fn contains(&self, key: &str) -> bool {
self.entries.contains_key(key)
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn current_seq(&self) -> u64 {
self.seq
}
pub fn keys(&self) -> Vec<&str> {
self.entries.keys().map(String::as_str).collect()
}
pub fn export_delta(&self, since_seq: u64) -> StoreDelta {
let changes: Vec<ChangeRecord> = self
.change_log
.iter()
.filter(|r| r.seq > since_seq)
.cloned()
.collect();
StoreDelta {
source_seq: self.seq,
since_seq,
changes,
}
}
pub fn merge_delta(&mut self, delta: &StoreDelta) -> Result<MergeResult> {
let mut result = MergeResult::default();
for record in &delta.changes {
match &record.kind {
ChangeKind::Insert | ChangeKind::Update => {
let vector = record
.vector
.as_ref()
.ok_or_else(|| anyhow!("Insert/Update record missing vector data"))?
.clone();
let metadata = record.metadata.clone();
if let Some(existing) = self.entries.get(&record.key) {
if existing.version >= record.seq {
result.conflicts_skipped += 1;
continue;
}
if let Some(e) = self.entries.get_mut(&record.key) {
e.vector = vector;
e.metadata = metadata;
e.version = record.seq;
}
result.updates_applied += 1;
} else {
self.entries.insert(
record.key.clone(),
StoredEntry {
vector,
metadata,
version: record.seq,
},
);
if record.kind == ChangeKind::Insert {
result.inserts_applied += 1;
} else {
result.updates_applied += 1;
}
}
}
ChangeKind::Delete => {
if let Some(existing) = self.entries.get(&record.key) {
if existing.version >= record.seq {
result.conflicts_skipped += 1;
continue;
}
}
if self.entries.remove(&record.key).is_some() {
result.deletes_applied += 1;
}
}
}
}
self.total_merges += 1;
Ok(result)
}
pub fn stats(&self) -> DeltaStoreStats {
DeltaStoreStats {
entry_count: self.entries.len(),
current_seq: self.seq,
log_length: self.change_log.len(),
total_inserts: self.total_inserts,
total_updates: self.total_updates,
total_deletes: self.total_deletes,
total_merges: self.total_merges,
}
}
pub fn compact_log(&mut self) {
let mut last_seq_per_key: HashMap<String, usize> = HashMap::new();
for (idx, record) in self.change_log.iter().enumerate() {
last_seq_per_key.insert(record.key.clone(), idx);
}
let keep: std::collections::HashSet<usize> = last_seq_per_key.values().copied().collect();
let mut new_log = Vec::with_capacity(keep.len());
for (idx, record) in self.change_log.iter().enumerate() {
if keep.contains(&idx) {
new_log.push(record.clone());
}
}
new_log.sort_by_key(|r| r.seq);
self.change_log = new_log;
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
fn make_store() -> DeltaSyncVectorStore {
DeltaSyncVectorStore::new()
}
#[test]
fn test_new_store_is_empty() {
let store = make_store();
assert!(store.is_empty());
assert_eq!(store.len(), 0);
assert_eq!(store.current_seq(), 0);
}
#[test]
fn test_insert_increments_seq() -> Result<()> {
let mut store = make_store();
let seq = store.insert("k1".to_string(), vec![1.0, 2.0])?;
assert_eq!(seq, 1);
assert_eq!(store.current_seq(), 1);
assert_eq!(store.len(), 1);
Ok(())
}
#[test]
fn test_insert_duplicate_key_fails() -> Result<()> {
let mut store = make_store();
store.insert("k1".to_string(), vec![1.0])?;
let err = store.insert("k1".to_string(), vec![2.0]);
assert!(err.is_err());
Ok(())
}
#[test]
fn test_update_existing_key() -> Result<()> {
let mut store = make_store();
store.insert("k1".to_string(), vec![1.0, 0.0])?;
let seq = store.update("k1".to_string(), vec![2.0, 0.0])?;
assert_eq!(seq, 2);
let entry = store.get("k1").expect("k1 not found");
assert_eq!(entry.vector, vec![2.0, 0.0]);
Ok(())
}
#[test]
fn test_update_missing_key_fails() {
let mut store = make_store();
let err = store.update("nonexistent".to_string(), vec![1.0]);
assert!(err.is_err());
}
#[test]
fn test_delete_existing_key() -> Result<()> {
let mut store = make_store();
store.insert("k1".to_string(), vec![1.0])?;
let seq = store.delete("k1")?;
assert_eq!(seq, 2);
assert!(!store.contains("k1"));
assert_eq!(store.len(), 0);
Ok(())
}
#[test]
fn test_delete_missing_key_fails() {
let mut store = make_store();
let err = store.delete("missing");
assert!(err.is_err());
}
#[test]
fn test_upsert_insert_path() -> Result<()> {
let mut store = make_store();
let seq = store.upsert("k".to_string(), vec![1.0])?;
assert_eq!(seq, 1);
assert_eq!(store.len(), 1);
Ok(())
}
#[test]
fn test_upsert_update_path() -> Result<()> {
let mut store = make_store();
store.insert("k".to_string(), vec![1.0])?;
store.upsert("k".to_string(), vec![99.0])?;
let entry = store.get("k").expect("k not found");
assert_eq!(entry.vector, vec![99.0]);
Ok(())
}
#[test]
fn test_contains_after_insert() -> Result<()> {
let mut store = make_store();
store.insert("x".to_string(), vec![0.0])?;
assert!(store.contains("x"));
assert!(!store.contains("y"));
Ok(())
}
#[test]
fn test_change_log_grows_with_operations() -> Result<()> {
let mut store = make_store();
store.insert("k1".to_string(), vec![1.0])?;
store.insert("k2".to_string(), vec![2.0])?;
store.update("k1".to_string(), vec![3.0])?;
store.delete("k2")?;
let stats = store.stats();
assert_eq!(stats.log_length, 4);
Ok(())
}
#[test]
fn test_change_log_records_correct_kinds() -> Result<()> {
let mut store = make_store();
store.insert("a".to_string(), vec![1.0])?;
store.update("a".to_string(), vec![2.0])?;
store.delete("a")?;
assert_eq!(store.change_log[0].kind, ChangeKind::Insert);
assert_eq!(store.change_log[1].kind, ChangeKind::Update);
assert_eq!(store.change_log[2].kind, ChangeKind::Delete);
Ok(())
}
#[test]
fn test_export_delta_full() -> Result<()> {
let mut store = make_store();
store.insert("a".to_string(), vec![1.0])?;
store.insert("b".to_string(), vec![2.0])?;
let delta = store.export_delta(0);
assert_eq!(delta.changes.len(), 2);
assert_eq!(delta.source_seq, 2);
Ok(())
}
#[test]
fn test_export_delta_incremental() -> Result<()> {
let mut store = make_store();
store.insert("a".to_string(), vec![1.0])?;
store.insert("b".to_string(), vec![2.0])?;
let delta = store.export_delta(1); assert_eq!(delta.changes.len(), 1);
assert_eq!(delta.changes[0].key, "b");
Ok(())
}
#[test]
fn test_export_delta_empty_when_up_to_date() -> Result<()> {
let mut store = make_store();
store.insert("a".to_string(), vec![1.0])?;
let delta = store.export_delta(1); assert!(delta.is_empty());
Ok(())
}
#[test]
fn test_merge_delta_inserts_new_entries() -> Result<()> {
let mut source = make_store();
source.insert("remote_key".to_string(), vec![42.0])?;
let delta = source.export_delta(0);
let mut target = make_store();
let result = target.merge_delta(&delta)?;
assert_eq!(result.inserts_applied, 1);
assert!(target.contains("remote_key"));
assert_eq!(
target.get("remote_key").expect("test value").vector,
vec![42.0]
);
Ok(())
}
#[test]
fn test_merge_delta_deletes_entries() -> Result<()> {
let mut target = make_store();
target.insert("to_delete".to_string(), vec![1.0])?;
let delta = StoreDelta {
source_seq: 99,
since_seq: 0,
changes: vec![ChangeRecord::delete(99, "to_delete".to_string())],
};
let result = target.merge_delta(&delta)?;
assert_eq!(result.deletes_applied, 1);
assert!(!target.contains("to_delete"));
Ok(())
}
#[test]
fn test_merge_delta_conflict_local_wins() -> Result<()> {
let mut target = make_store();
for i in 0..5 {
target.insert(format!("k{}", i), vec![i as f32])?;
}
let delta = StoreDelta {
source_seq: 1,
since_seq: 0,
changes: vec![ChangeRecord::update(
1,
"k0".to_string(),
vec![999.0],
HashMap::new(),
)],
};
let result = target.merge_delta(&delta)?;
assert_eq!(result.conflicts_skipped, 1);
assert_eq!(target.get("k0").expect("test value").vector, vec![0.0]);
Ok(())
}
#[test]
fn test_merge_delta_remote_wins_newer_seq() -> Result<()> {
let mut target = make_store();
target.insert("k".to_string(), vec![1.0])?;
let delta = StoreDelta {
source_seq: 100,
since_seq: 0,
changes: vec![ChangeRecord::update(
100,
"k".to_string(),
vec![200.0],
HashMap::new(),
)],
};
let result = target.merge_delta(&delta)?;
assert_eq!(result.updates_applied, 1);
assert_eq!(target.get("k").expect("test value").vector, vec![200.0]);
Ok(())
}
#[test]
fn test_merge_empty_delta_noop() -> Result<()> {
let mut store = make_store();
store.insert("a".to_string(), vec![1.0])?;
let delta = StoreDelta {
source_seq: 0,
since_seq: 0,
changes: Vec::new(),
};
let result = store.merge_delta(&delta)?;
assert_eq!(result.total_applied(), 0);
assert_eq!(store.len(), 1);
Ok(())
}
#[test]
fn test_merge_result_total_applied() -> Result<()> {
let mut source = make_store();
source.insert("a".to_string(), vec![1.0])?;
source.insert("b".to_string(), vec![2.0])?;
let delta = source.export_delta(0);
let mut target = make_store();
let result = target.merge_delta(&delta)?;
assert_eq!(result.total_applied(), 2);
Ok(())
}
#[test]
fn test_stats_counters() -> Result<()> {
let mut store = make_store();
store.insert("a".to_string(), vec![1.0])?;
store.insert("b".to_string(), vec![2.0])?;
store.update("a".to_string(), vec![10.0])?;
store.delete("b")?;
let stats = store.stats();
assert_eq!(stats.total_inserts, 2);
assert_eq!(stats.total_updates, 1);
assert_eq!(stats.total_deletes, 1);
assert_eq!(stats.entry_count, 1);
assert_eq!(stats.current_seq, 4);
Ok(())
}
#[test]
fn test_stats_merge_counter() -> Result<()> {
let mut source = make_store();
source.insert("x".to_string(), vec![1.0])?;
let delta = source.export_delta(0);
let mut target = make_store();
target.merge_delta(&delta)?;
target.merge_delta(&StoreDelta {
source_seq: 0,
since_seq: 0,
changes: Vec::new(),
})?;
assert_eq!(target.stats().total_merges, 2);
Ok(())
}
#[test]
fn test_compact_log_reduces_size() -> Result<()> {
let mut store = make_store();
store.insert("k".to_string(), vec![1.0])?;
store.update("k".to_string(), vec![2.0])?;
store.update("k".to_string(), vec![3.0])?;
assert_eq!(store.stats().log_length, 3);
store.compact_log();
assert_eq!(store.stats().log_length, 1);
Ok(())
}
#[test]
fn test_compact_log_preserves_state() -> Result<()> {
let mut store = make_store();
for i in 0..5 {
store.insert(format!("k{}", i), vec![i as f32])?;
}
store.update("k0".to_string(), vec![99.0])?;
store.compact_log();
assert_eq!(store.get("k0").expect("test value").vector, vec![99.0]);
assert_eq!(store.len(), 5);
Ok(())
}
#[test]
fn test_keys_returns_all_live_keys() -> Result<()> {
let mut store = make_store();
store.insert("a".to_string(), vec![1.0])?;
store.insert("b".to_string(), vec![2.0])?;
store.insert("c".to_string(), vec![3.0])?;
store.delete("b")?;
let mut keys = store.keys();
keys.sort();
assert_eq!(keys, vec!["a", "c"]);
Ok(())
}
#[test]
fn test_insert_with_metadata_stored() -> Result<()> {
let mut store = make_store();
let mut meta = HashMap::new();
meta.insert("source".to_string(), "test".to_string());
store.insert_with_metadata("k".to_string(), vec![1.0], meta.clone())?;
let entry = store.get("k").expect("k not found");
assert_eq!(
entry.metadata.get("source").map(String::as_str),
Some("test")
);
Ok(())
}
}