1use anyhow::{anyhow, Result};
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28pub enum ChangeKind {
29 Insert,
31 Update,
33 Delete,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ChangeRecord {
42 pub seq: u64,
44 pub key: String,
46 pub kind: ChangeKind,
48 pub vector: Option<Vec<f32>>,
50 pub metadata: HashMap<String, String>,
52}
53
54impl ChangeRecord {
55 fn insert(seq: u64, key: String, vector: Vec<f32>, metadata: HashMap<String, String>) -> Self {
56 Self {
57 seq,
58 key,
59 kind: ChangeKind::Insert,
60 vector: Some(vector),
61 metadata,
62 }
63 }
64
65 fn update(seq: u64, key: String, vector: Vec<f32>, metadata: HashMap<String, String>) -> Self {
66 Self {
67 seq,
68 key,
69 kind: ChangeKind::Update,
70 vector: Some(vector),
71 metadata,
72 }
73 }
74
75 fn delete(seq: u64, key: String) -> Self {
76 Self {
77 seq,
78 key,
79 kind: ChangeKind::Delete,
80 vector: None,
81 metadata: HashMap::new(),
82 }
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct StoredEntry {
91 pub vector: Vec<f32>,
93 pub metadata: HashMap<String, String>,
95 pub version: u64,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct StoreDelta {
104 pub source_seq: u64,
106 pub since_seq: u64,
108 pub changes: Vec<ChangeRecord>,
110}
111
112impl StoreDelta {
113 pub fn len(&self) -> usize {
115 self.changes.len()
116 }
117
118 pub fn is_empty(&self) -> bool {
120 self.changes.is_empty()
121 }
122}
123
124#[derive(Debug, Clone, Default, Serialize, Deserialize)]
128pub struct MergeResult {
129 pub inserts_applied: usize,
131 pub updates_applied: usize,
133 pub deletes_applied: usize,
135 pub conflicts_skipped: usize,
137}
138
139impl MergeResult {
140 pub fn total_applied(&self) -> usize {
142 self.inserts_applied + self.updates_applied + self.deletes_applied
143 }
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct DeltaStoreStats {
151 pub entry_count: usize,
153 pub current_seq: u64,
155 pub log_length: usize,
157 pub total_inserts: u64,
159 pub total_updates: u64,
161 pub total_deletes: u64,
163 pub total_merges: u64,
165}
166
167pub struct DeltaSyncVectorStore {
172 entries: HashMap<String, StoredEntry>,
174 change_log: Vec<ChangeRecord>,
176 seq: u64,
178 total_inserts: u64,
180 total_updates: u64,
181 total_deletes: u64,
182 total_merges: u64,
183}
184
185impl Default for DeltaSyncVectorStore {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191impl DeltaSyncVectorStore {
192 pub fn new() -> Self {
194 Self {
195 entries: HashMap::new(),
196 change_log: Vec::new(),
197 seq: 0,
198 total_inserts: 0,
199 total_updates: 0,
200 total_deletes: 0,
201 total_merges: 0,
202 }
203 }
204
205 pub fn insert(&mut self, key: String, vector: Vec<f32>) -> Result<u64> {
209 self.insert_with_metadata(key, vector, HashMap::new())
210 }
211
212 pub fn insert_with_metadata(
214 &mut self,
215 key: String,
216 vector: Vec<f32>,
217 metadata: HashMap<String, String>,
218 ) -> Result<u64> {
219 if self.entries.contains_key(&key) {
220 return Err(anyhow!("Key '{}' already exists; use update()", key));
221 }
222 self.seq += 1;
223 let seq = self.seq;
224 let record = ChangeRecord::insert(seq, key.clone(), vector.clone(), metadata.clone());
225 self.change_log.push(record);
226 self.entries.insert(
227 key,
228 StoredEntry {
229 vector,
230 metadata,
231 version: seq,
232 },
233 );
234 self.total_inserts += 1;
235 Ok(seq)
236 }
237
238 pub fn update(&mut self, key: String, vector: Vec<f32>) -> Result<u64> {
240 self.update_with_metadata(key, vector, HashMap::new())
241 }
242
243 pub fn update_with_metadata(
245 &mut self,
246 key: String,
247 vector: Vec<f32>,
248 metadata: HashMap<String, String>,
249 ) -> Result<u64> {
250 if !self.entries.contains_key(&key) {
251 return Err(anyhow!("Key '{}' does not exist; use insert()", key));
252 }
253 self.seq += 1;
254 let seq = self.seq;
255 let record = ChangeRecord::update(seq, key.clone(), vector.clone(), metadata.clone());
256 self.change_log.push(record);
257 if let Some(entry) = self.entries.get_mut(&key) {
258 entry.vector = vector;
259 entry.metadata = metadata;
260 entry.version = seq;
261 }
262 self.total_updates += 1;
263 Ok(seq)
264 }
265
266 pub fn upsert(&mut self, key: String, vector: Vec<f32>) -> Result<u64> {
268 if self.entries.contains_key(&key) {
269 self.update(key, vector)
270 } else {
271 self.insert(key, vector)
272 }
273 }
274
275 pub fn delete(&mut self, key: &str) -> Result<u64> {
277 if !self.entries.contains_key(key) {
278 return Err(anyhow!("Key '{}' not found", key));
279 }
280 self.seq += 1;
281 let seq = self.seq;
282 let record = ChangeRecord::delete(seq, key.to_string());
283 self.change_log.push(record);
284 self.entries.remove(key);
285 self.total_deletes += 1;
286 Ok(seq)
287 }
288
289 pub fn get(&self, key: &str) -> Option<&StoredEntry> {
293 self.entries.get(key)
294 }
295
296 pub fn contains(&self, key: &str) -> bool {
298 self.entries.contains_key(key)
299 }
300
301 pub fn len(&self) -> usize {
303 self.entries.len()
304 }
305
306 pub fn is_empty(&self) -> bool {
308 self.entries.is_empty()
309 }
310
311 pub fn current_seq(&self) -> u64 {
313 self.seq
314 }
315
316 pub fn keys(&self) -> Vec<&str> {
318 self.entries.keys().map(String::as_str).collect()
319 }
320
321 pub fn export_delta(&self, since_seq: u64) -> StoreDelta {
327 let changes: Vec<ChangeRecord> = self
328 .change_log
329 .iter()
330 .filter(|r| r.seq > since_seq)
331 .cloned()
332 .collect();
333 StoreDelta {
334 source_seq: self.seq,
335 since_seq,
336 changes,
337 }
338 }
339
340 pub fn merge_delta(&mut self, delta: &StoreDelta) -> Result<MergeResult> {
349 let mut result = MergeResult::default();
350
351 for record in &delta.changes {
352 match &record.kind {
353 ChangeKind::Insert | ChangeKind::Update => {
354 let vector = record
355 .vector
356 .as_ref()
357 .ok_or_else(|| anyhow!("Insert/Update record missing vector data"))?
358 .clone();
359 let metadata = record.metadata.clone();
360
361 if let Some(existing) = self.entries.get(&record.key) {
362 if existing.version >= record.seq {
363 result.conflicts_skipped += 1;
365 continue;
366 }
367 if let Some(e) = self.entries.get_mut(&record.key) {
369 e.vector = vector;
370 e.metadata = metadata;
371 e.version = record.seq;
372 }
373 result.updates_applied += 1;
374 } else {
375 self.entries.insert(
377 record.key.clone(),
378 StoredEntry {
379 vector,
380 metadata,
381 version: record.seq,
382 },
383 );
384 if record.kind == ChangeKind::Insert {
385 result.inserts_applied += 1;
386 } else {
387 result.updates_applied += 1;
388 }
389 }
390 }
391 ChangeKind::Delete => {
392 if let Some(existing) = self.entries.get(&record.key) {
393 if existing.version >= record.seq {
394 result.conflicts_skipped += 1;
395 continue;
396 }
397 }
398 if self.entries.remove(&record.key).is_some() {
399 result.deletes_applied += 1;
400 }
401 }
402 }
403 }
404
405 self.total_merges += 1;
406 Ok(result)
407 }
408
409 pub fn stats(&self) -> DeltaStoreStats {
411 DeltaStoreStats {
412 entry_count: self.entries.len(),
413 current_seq: self.seq,
414 log_length: self.change_log.len(),
415 total_inserts: self.total_inserts,
416 total_updates: self.total_updates,
417 total_deletes: self.total_deletes,
418 total_merges: self.total_merges,
419 }
420 }
421
422 pub fn compact_log(&mut self) {
427 let mut last_seq_per_key: HashMap<String, usize> = HashMap::new();
429 for (idx, record) in self.change_log.iter().enumerate() {
430 last_seq_per_key.insert(record.key.clone(), idx);
431 }
432
433 let keep: std::collections::HashSet<usize> = last_seq_per_key.values().copied().collect();
434
435 let mut new_log = Vec::with_capacity(keep.len());
436 for (idx, record) in self.change_log.iter().enumerate() {
437 if keep.contains(&idx) {
438 new_log.push(record.clone());
439 }
440 }
441 new_log.sort_by_key(|r| r.seq);
442 self.change_log = new_log;
443 }
444}
445
446#[cfg(test)]
451mod tests {
452 use super::*;
453
454 fn make_store() -> DeltaSyncVectorStore {
455 DeltaSyncVectorStore::new()
456 }
457
458 #[test]
461 fn test_new_store_is_empty() {
462 let store = make_store();
463 assert!(store.is_empty());
464 assert_eq!(store.len(), 0);
465 assert_eq!(store.current_seq(), 0);
466 }
467
468 #[test]
469 fn test_insert_increments_seq() {
470 let mut store = make_store();
471 let seq = store.insert("k1".to_string(), vec![1.0, 2.0]).unwrap();
472 assert_eq!(seq, 1);
473 assert_eq!(store.current_seq(), 1);
474 assert_eq!(store.len(), 1);
475 }
476
477 #[test]
478 fn test_insert_duplicate_key_fails() {
479 let mut store = make_store();
480 store.insert("k1".to_string(), vec![1.0]).unwrap();
481 let err = store.insert("k1".to_string(), vec![2.0]);
482 assert!(err.is_err());
483 }
484
485 #[test]
486 fn test_update_existing_key() {
487 let mut store = make_store();
488 store.insert("k1".to_string(), vec![1.0, 0.0]).unwrap();
489 let seq = store.update("k1".to_string(), vec![2.0, 0.0]).unwrap();
490 assert_eq!(seq, 2);
491 let entry = store.get("k1").unwrap();
492 assert_eq!(entry.vector, vec![2.0, 0.0]);
493 }
494
495 #[test]
496 fn test_update_missing_key_fails() {
497 let mut store = make_store();
498 let err = store.update("nonexistent".to_string(), vec![1.0]);
499 assert!(err.is_err());
500 }
501
502 #[test]
503 fn test_delete_existing_key() {
504 let mut store = make_store();
505 store.insert("k1".to_string(), vec![1.0]).unwrap();
506 let seq = store.delete("k1").unwrap();
507 assert_eq!(seq, 2);
508 assert!(!store.contains("k1"));
509 assert_eq!(store.len(), 0);
510 }
511
512 #[test]
513 fn test_delete_missing_key_fails() {
514 let mut store = make_store();
515 let err = store.delete("missing");
516 assert!(err.is_err());
517 }
518
519 #[test]
520 fn test_upsert_insert_path() {
521 let mut store = make_store();
522 let seq = store.upsert("k".to_string(), vec![1.0]).unwrap();
523 assert_eq!(seq, 1);
524 assert_eq!(store.len(), 1);
525 }
526
527 #[test]
528 fn test_upsert_update_path() {
529 let mut store = make_store();
530 store.insert("k".to_string(), vec![1.0]).unwrap();
531 store.upsert("k".to_string(), vec![99.0]).unwrap();
532 let entry = store.get("k").unwrap();
533 assert_eq!(entry.vector, vec![99.0]);
534 }
535
536 #[test]
537 fn test_contains_after_insert() {
538 let mut store = make_store();
539 store.insert("x".to_string(), vec![0.0]).unwrap();
540 assert!(store.contains("x"));
541 assert!(!store.contains("y"));
542 }
543
544 #[test]
547 fn test_change_log_grows_with_operations() {
548 let mut store = make_store();
549 store.insert("k1".to_string(), vec![1.0]).unwrap();
550 store.insert("k2".to_string(), vec![2.0]).unwrap();
551 store.update("k1".to_string(), vec![3.0]).unwrap();
552 store.delete("k2").unwrap();
553 let stats = store.stats();
554 assert_eq!(stats.log_length, 4);
555 }
556
557 #[test]
558 fn test_change_log_records_correct_kinds() {
559 let mut store = make_store();
560 store.insert("a".to_string(), vec![1.0]).unwrap();
561 store.update("a".to_string(), vec![2.0]).unwrap();
562 store.delete("a").unwrap();
563 assert_eq!(store.change_log[0].kind, ChangeKind::Insert);
564 assert_eq!(store.change_log[1].kind, ChangeKind::Update);
565 assert_eq!(store.change_log[2].kind, ChangeKind::Delete);
566 }
567
568 #[test]
571 fn test_export_delta_full() {
572 let mut store = make_store();
573 store.insert("a".to_string(), vec![1.0]).unwrap();
574 store.insert("b".to_string(), vec![2.0]).unwrap();
575 let delta = store.export_delta(0);
576 assert_eq!(delta.changes.len(), 2);
577 assert_eq!(delta.source_seq, 2);
578 }
579
580 #[test]
581 fn test_export_delta_incremental() {
582 let mut store = make_store();
583 store.insert("a".to_string(), vec![1.0]).unwrap();
584 store.insert("b".to_string(), vec![2.0]).unwrap();
585 let delta = store.export_delta(1); assert_eq!(delta.changes.len(), 1);
587 assert_eq!(delta.changes[0].key, "b");
588 }
589
590 #[test]
591 fn test_export_delta_empty_when_up_to_date() {
592 let mut store = make_store();
593 store.insert("a".to_string(), vec![1.0]).unwrap();
594 let delta = store.export_delta(1); assert!(delta.is_empty());
596 }
597
598 #[test]
601 fn test_merge_delta_inserts_new_entries() {
602 let mut source = make_store();
603 source.insert("remote_key".to_string(), vec![42.0]).unwrap();
604 let delta = source.export_delta(0);
605
606 let mut target = make_store();
607 let result = target.merge_delta(&delta).unwrap();
608
609 assert_eq!(result.inserts_applied, 1);
610 assert!(target.contains("remote_key"));
611 assert_eq!(target.get("remote_key").unwrap().vector, vec![42.0]);
612 }
613
614 #[test]
615 fn test_merge_delta_deletes_entries() {
616 let mut target = make_store();
617 target.insert("to_delete".to_string(), vec![1.0]).unwrap();
618
619 let delta = StoreDelta {
621 source_seq: 99,
622 since_seq: 0,
623 changes: vec![ChangeRecord::delete(99, "to_delete".to_string())],
624 };
625
626 let result = target.merge_delta(&delta).unwrap();
627 assert_eq!(result.deletes_applied, 1);
628 assert!(!target.contains("to_delete"));
629 }
630
631 #[test]
632 fn test_merge_delta_conflict_local_wins() {
633 let mut target = make_store();
634 for i in 0..5 {
636 target.insert(format!("k{}", i), vec![i as f32]).unwrap();
637 }
638
639 let delta = StoreDelta {
641 source_seq: 1,
642 since_seq: 0,
643 changes: vec![ChangeRecord::update(
644 1,
645 "k0".to_string(),
646 vec![999.0],
647 HashMap::new(),
648 )],
649 };
650
651 let result = target.merge_delta(&delta).unwrap();
652 assert_eq!(result.conflicts_skipped, 1);
653 assert_eq!(target.get("k0").unwrap().vector, vec![0.0]);
655 }
656
657 #[test]
658 fn test_merge_delta_remote_wins_newer_seq() {
659 let mut target = make_store();
660 target.insert("k".to_string(), vec![1.0]).unwrap(); let delta = StoreDelta {
664 source_seq: 100,
665 since_seq: 0,
666 changes: vec![ChangeRecord::update(
667 100,
668 "k".to_string(),
669 vec![200.0],
670 HashMap::new(),
671 )],
672 };
673
674 let result = target.merge_delta(&delta).unwrap();
675 assert_eq!(result.updates_applied, 1);
676 assert_eq!(target.get("k").unwrap().vector, vec![200.0]);
677 }
678
679 #[test]
680 fn test_merge_empty_delta_noop() {
681 let mut store = make_store();
682 store.insert("a".to_string(), vec![1.0]).unwrap();
683 let delta = StoreDelta {
684 source_seq: 0,
685 since_seq: 0,
686 changes: Vec::new(),
687 };
688 let result = store.merge_delta(&delta).unwrap();
689 assert_eq!(result.total_applied(), 0);
690 assert_eq!(store.len(), 1);
691 }
692
693 #[test]
694 fn test_merge_result_total_applied() {
695 let mut source = make_store();
696 source.insert("a".to_string(), vec![1.0]).unwrap();
697 source.insert("b".to_string(), vec![2.0]).unwrap();
698 let delta = source.export_delta(0);
699
700 let mut target = make_store();
701 let result = target.merge_delta(&delta).unwrap();
702 assert_eq!(result.total_applied(), 2);
703 }
704
705 #[test]
708 fn test_stats_counters() {
709 let mut store = make_store();
710 store.insert("a".to_string(), vec![1.0]).unwrap();
711 store.insert("b".to_string(), vec![2.0]).unwrap();
712 store.update("a".to_string(), vec![10.0]).unwrap();
713 store.delete("b").unwrap();
714
715 let stats = store.stats();
716 assert_eq!(stats.total_inserts, 2);
717 assert_eq!(stats.total_updates, 1);
718 assert_eq!(stats.total_deletes, 1);
719 assert_eq!(stats.entry_count, 1);
720 assert_eq!(stats.current_seq, 4);
721 }
722
723 #[test]
724 fn test_stats_merge_counter() {
725 let mut source = make_store();
726 source.insert("x".to_string(), vec![1.0]).unwrap();
727 let delta = source.export_delta(0);
728
729 let mut target = make_store();
730 target.merge_delta(&delta).unwrap();
731 target
732 .merge_delta(&StoreDelta {
733 source_seq: 0,
734 since_seq: 0,
735 changes: Vec::new(),
736 })
737 .unwrap();
738
739 assert_eq!(target.stats().total_merges, 2);
740 }
741
742 #[test]
745 fn test_compact_log_reduces_size() {
746 let mut store = make_store();
747 store.insert("k".to_string(), vec![1.0]).unwrap();
748 store.update("k".to_string(), vec![2.0]).unwrap();
749 store.update("k".to_string(), vec![3.0]).unwrap();
750 assert_eq!(store.stats().log_length, 3);
751 store.compact_log();
752 assert_eq!(store.stats().log_length, 1);
754 }
755
756 #[test]
757 fn test_compact_log_preserves_state() {
758 let mut store = make_store();
759 for i in 0..5 {
760 store.insert(format!("k{}", i), vec![i as f32]).unwrap();
761 }
762 store.update("k0".to_string(), vec![99.0]).unwrap();
763 store.compact_log();
764 assert_eq!(store.get("k0").unwrap().vector, vec![99.0]);
765 assert_eq!(store.len(), 5);
766 }
767
768 #[test]
771 fn test_keys_returns_all_live_keys() {
772 let mut store = make_store();
773 store.insert("a".to_string(), vec![1.0]).unwrap();
774 store.insert("b".to_string(), vec![2.0]).unwrap();
775 store.insert("c".to_string(), vec![3.0]).unwrap();
776 store.delete("b").unwrap();
777 let mut keys = store.keys();
778 keys.sort();
779 assert_eq!(keys, vec!["a", "c"]);
780 }
781
782 #[test]
785 fn test_insert_with_metadata_stored() {
786 let mut store = make_store();
787 let mut meta = HashMap::new();
788 meta.insert("source".to_string(), "test".to_string());
789 store
790 .insert_with_metadata("k".to_string(), vec![1.0], meta.clone())
791 .unwrap();
792 let entry = store.get("k").unwrap();
793 assert_eq!(
794 entry.metadata.get("source").map(String::as_str),
795 Some("test")
796 );
797 }
798}