1use anyhow::{anyhow, Result};
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28pub enum ChangeKind {
29 Insert,
31 Update,
33 Delete,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ChangeRecord {
42 pub seq: u64,
44 pub key: String,
46 pub kind: ChangeKind,
48 pub vector: Option<Vec<f32>>,
50 pub metadata: HashMap<String, String>,
52}
53
54impl ChangeRecord {
55 fn insert(seq: u64, key: String, vector: Vec<f32>, metadata: HashMap<String, String>) -> Self {
56 Self {
57 seq,
58 key,
59 kind: ChangeKind::Insert,
60 vector: Some(vector),
61 metadata,
62 }
63 }
64
65 fn update(seq: u64, key: String, vector: Vec<f32>, metadata: HashMap<String, String>) -> Self {
66 Self {
67 seq,
68 key,
69 kind: ChangeKind::Update,
70 vector: Some(vector),
71 metadata,
72 }
73 }
74
75 fn delete(seq: u64, key: String) -> Self {
76 Self {
77 seq,
78 key,
79 kind: ChangeKind::Delete,
80 vector: None,
81 metadata: HashMap::new(),
82 }
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct StoredEntry {
91 pub vector: Vec<f32>,
93 pub metadata: HashMap<String, String>,
95 pub version: u64,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct StoreDelta {
104 pub source_seq: u64,
106 pub since_seq: u64,
108 pub changes: Vec<ChangeRecord>,
110}
111
112impl StoreDelta {
113 pub fn len(&self) -> usize {
115 self.changes.len()
116 }
117
118 pub fn is_empty(&self) -> bool {
120 self.changes.is_empty()
121 }
122}
123
124#[derive(Debug, Clone, Default, Serialize, Deserialize)]
128pub struct MergeResult {
129 pub inserts_applied: usize,
131 pub updates_applied: usize,
133 pub deletes_applied: usize,
135 pub conflicts_skipped: usize,
137}
138
139impl MergeResult {
140 pub fn total_applied(&self) -> usize {
142 self.inserts_applied + self.updates_applied + self.deletes_applied
143 }
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct DeltaStoreStats {
151 pub entry_count: usize,
153 pub current_seq: u64,
155 pub log_length: usize,
157 pub total_inserts: u64,
159 pub total_updates: u64,
161 pub total_deletes: u64,
163 pub total_merges: u64,
165}
166
167pub struct DeltaSyncVectorStore {
172 entries: HashMap<String, StoredEntry>,
174 change_log: Vec<ChangeRecord>,
176 seq: u64,
178 total_inserts: u64,
180 total_updates: u64,
181 total_deletes: u64,
182 total_merges: u64,
183}
184
185impl Default for DeltaSyncVectorStore {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191impl DeltaSyncVectorStore {
192 pub fn new() -> Self {
194 Self {
195 entries: HashMap::new(),
196 change_log: Vec::new(),
197 seq: 0,
198 total_inserts: 0,
199 total_updates: 0,
200 total_deletes: 0,
201 total_merges: 0,
202 }
203 }
204
205 pub fn insert(&mut self, key: String, vector: Vec<f32>) -> Result<u64> {
209 self.insert_with_metadata(key, vector, HashMap::new())
210 }
211
212 pub fn insert_with_metadata(
214 &mut self,
215 key: String,
216 vector: Vec<f32>,
217 metadata: HashMap<String, String>,
218 ) -> Result<u64> {
219 if self.entries.contains_key(&key) {
220 return Err(anyhow!("Key '{}' already exists; use update()", key));
221 }
222 self.seq += 1;
223 let seq = self.seq;
224 let record = ChangeRecord::insert(seq, key.clone(), vector.clone(), metadata.clone());
225 self.change_log.push(record);
226 self.entries.insert(
227 key,
228 StoredEntry {
229 vector,
230 metadata,
231 version: seq,
232 },
233 );
234 self.total_inserts += 1;
235 Ok(seq)
236 }
237
238 pub fn update(&mut self, key: String, vector: Vec<f32>) -> Result<u64> {
240 self.update_with_metadata(key, vector, HashMap::new())
241 }
242
243 pub fn update_with_metadata(
245 &mut self,
246 key: String,
247 vector: Vec<f32>,
248 metadata: HashMap<String, String>,
249 ) -> Result<u64> {
250 if !self.entries.contains_key(&key) {
251 return Err(anyhow!("Key '{}' does not exist; use insert()", key));
252 }
253 self.seq += 1;
254 let seq = self.seq;
255 let record = ChangeRecord::update(seq, key.clone(), vector.clone(), metadata.clone());
256 self.change_log.push(record);
257 if let Some(entry) = self.entries.get_mut(&key) {
258 entry.vector = vector;
259 entry.metadata = metadata;
260 entry.version = seq;
261 }
262 self.total_updates += 1;
263 Ok(seq)
264 }
265
266 pub fn upsert(&mut self, key: String, vector: Vec<f32>) -> Result<u64> {
268 if self.entries.contains_key(&key) {
269 self.update(key, vector)
270 } else {
271 self.insert(key, vector)
272 }
273 }
274
275 pub fn delete(&mut self, key: &str) -> Result<u64> {
277 if !self.entries.contains_key(key) {
278 return Err(anyhow!("Key '{}' not found", key));
279 }
280 self.seq += 1;
281 let seq = self.seq;
282 let record = ChangeRecord::delete(seq, key.to_string());
283 self.change_log.push(record);
284 self.entries.remove(key);
285 self.total_deletes += 1;
286 Ok(seq)
287 }
288
289 pub fn get(&self, key: &str) -> Option<&StoredEntry> {
293 self.entries.get(key)
294 }
295
296 pub fn contains(&self, key: &str) -> bool {
298 self.entries.contains_key(key)
299 }
300
301 pub fn len(&self) -> usize {
303 self.entries.len()
304 }
305
306 pub fn is_empty(&self) -> bool {
308 self.entries.is_empty()
309 }
310
311 pub fn current_seq(&self) -> u64 {
313 self.seq
314 }
315
316 pub fn keys(&self) -> Vec<&str> {
318 self.entries.keys().map(String::as_str).collect()
319 }
320
321 pub fn export_delta(&self, since_seq: u64) -> StoreDelta {
327 let changes: Vec<ChangeRecord> = self
328 .change_log
329 .iter()
330 .filter(|r| r.seq > since_seq)
331 .cloned()
332 .collect();
333 StoreDelta {
334 source_seq: self.seq,
335 since_seq,
336 changes,
337 }
338 }
339
340 pub fn merge_delta(&mut self, delta: &StoreDelta) -> Result<MergeResult> {
349 let mut result = MergeResult::default();
350
351 for record in &delta.changes {
352 match &record.kind {
353 ChangeKind::Insert | ChangeKind::Update => {
354 let vector = record
355 .vector
356 .as_ref()
357 .ok_or_else(|| anyhow!("Insert/Update record missing vector data"))?
358 .clone();
359 let metadata = record.metadata.clone();
360
361 if let Some(existing) = self.entries.get(&record.key) {
362 if existing.version >= record.seq {
363 result.conflicts_skipped += 1;
365 continue;
366 }
367 if let Some(e) = self.entries.get_mut(&record.key) {
369 e.vector = vector;
370 e.metadata = metadata;
371 e.version = record.seq;
372 }
373 result.updates_applied += 1;
374 } else {
375 self.entries.insert(
377 record.key.clone(),
378 StoredEntry {
379 vector,
380 metadata,
381 version: record.seq,
382 },
383 );
384 if record.kind == ChangeKind::Insert {
385 result.inserts_applied += 1;
386 } else {
387 result.updates_applied += 1;
388 }
389 }
390 }
391 ChangeKind::Delete => {
392 if let Some(existing) = self.entries.get(&record.key) {
393 if existing.version >= record.seq {
394 result.conflicts_skipped += 1;
395 continue;
396 }
397 }
398 if self.entries.remove(&record.key).is_some() {
399 result.deletes_applied += 1;
400 }
401 }
402 }
403 }
404
405 self.total_merges += 1;
406 Ok(result)
407 }
408
409 pub fn stats(&self) -> DeltaStoreStats {
411 DeltaStoreStats {
412 entry_count: self.entries.len(),
413 current_seq: self.seq,
414 log_length: self.change_log.len(),
415 total_inserts: self.total_inserts,
416 total_updates: self.total_updates,
417 total_deletes: self.total_deletes,
418 total_merges: self.total_merges,
419 }
420 }
421
422 pub fn compact_log(&mut self) {
427 let mut last_seq_per_key: HashMap<String, usize> = HashMap::new();
429 for (idx, record) in self.change_log.iter().enumerate() {
430 last_seq_per_key.insert(record.key.clone(), idx);
431 }
432
433 let keep: std::collections::HashSet<usize> = last_seq_per_key.values().copied().collect();
434
435 let mut new_log = Vec::with_capacity(keep.len());
436 for (idx, record) in self.change_log.iter().enumerate() {
437 if keep.contains(&idx) {
438 new_log.push(record.clone());
439 }
440 }
441 new_log.sort_by_key(|r| r.seq);
442 self.change_log = new_log;
443 }
444}
445
446#[cfg(test)]
451mod tests {
452 use super::*;
453 use anyhow::Result;
454
455 fn make_store() -> DeltaSyncVectorStore {
456 DeltaSyncVectorStore::new()
457 }
458
459 #[test]
462 fn test_new_store_is_empty() {
463 let store = make_store();
464 assert!(store.is_empty());
465 assert_eq!(store.len(), 0);
466 assert_eq!(store.current_seq(), 0);
467 }
468
469 #[test]
470 fn test_insert_increments_seq() -> Result<()> {
471 let mut store = make_store();
472 let seq = store.insert("k1".to_string(), vec![1.0, 2.0])?;
473 assert_eq!(seq, 1);
474 assert_eq!(store.current_seq(), 1);
475 assert_eq!(store.len(), 1);
476 Ok(())
477 }
478
479 #[test]
480 fn test_insert_duplicate_key_fails() -> Result<()> {
481 let mut store = make_store();
482 store.insert("k1".to_string(), vec![1.0])?;
483 let err = store.insert("k1".to_string(), vec![2.0]);
484 assert!(err.is_err());
485 Ok(())
486 }
487
488 #[test]
489 fn test_update_existing_key() -> Result<()> {
490 let mut store = make_store();
491 store.insert("k1".to_string(), vec![1.0, 0.0])?;
492 let seq = store.update("k1".to_string(), vec![2.0, 0.0])?;
493 assert_eq!(seq, 2);
494 let entry = store.get("k1").expect("k1 not found");
495 assert_eq!(entry.vector, vec![2.0, 0.0]);
496 Ok(())
497 }
498
499 #[test]
500 fn test_update_missing_key_fails() {
501 let mut store = make_store();
502 let err = store.update("nonexistent".to_string(), vec![1.0]);
503 assert!(err.is_err());
504 }
505
506 #[test]
507 fn test_delete_existing_key() -> Result<()> {
508 let mut store = make_store();
509 store.insert("k1".to_string(), vec![1.0])?;
510 let seq = store.delete("k1")?;
511 assert_eq!(seq, 2);
512 assert!(!store.contains("k1"));
513 assert_eq!(store.len(), 0);
514 Ok(())
515 }
516
517 #[test]
518 fn test_delete_missing_key_fails() {
519 let mut store = make_store();
520 let err = store.delete("missing");
521 assert!(err.is_err());
522 }
523
524 #[test]
525 fn test_upsert_insert_path() -> Result<()> {
526 let mut store = make_store();
527 let seq = store.upsert("k".to_string(), vec![1.0])?;
528 assert_eq!(seq, 1);
529 assert_eq!(store.len(), 1);
530 Ok(())
531 }
532
533 #[test]
534 fn test_upsert_update_path() -> Result<()> {
535 let mut store = make_store();
536 store.insert("k".to_string(), vec![1.0])?;
537 store.upsert("k".to_string(), vec![99.0])?;
538 let entry = store.get("k").expect("k not found");
539 assert_eq!(entry.vector, vec![99.0]);
540 Ok(())
541 }
542
543 #[test]
544 fn test_contains_after_insert() -> Result<()> {
545 let mut store = make_store();
546 store.insert("x".to_string(), vec![0.0])?;
547 assert!(store.contains("x"));
548 assert!(!store.contains("y"));
549 Ok(())
550 }
551
552 #[test]
555 fn test_change_log_grows_with_operations() -> Result<()> {
556 let mut store = make_store();
557 store.insert("k1".to_string(), vec![1.0])?;
558 store.insert("k2".to_string(), vec![2.0])?;
559 store.update("k1".to_string(), vec![3.0])?;
560 store.delete("k2")?;
561 let stats = store.stats();
562 assert_eq!(stats.log_length, 4);
563 Ok(())
564 }
565
566 #[test]
567 fn test_change_log_records_correct_kinds() -> Result<()> {
568 let mut store = make_store();
569 store.insert("a".to_string(), vec![1.0])?;
570 store.update("a".to_string(), vec![2.0])?;
571 store.delete("a")?;
572 assert_eq!(store.change_log[0].kind, ChangeKind::Insert);
573 assert_eq!(store.change_log[1].kind, ChangeKind::Update);
574 assert_eq!(store.change_log[2].kind, ChangeKind::Delete);
575 Ok(())
576 }
577
578 #[test]
581 fn test_export_delta_full() -> Result<()> {
582 let mut store = make_store();
583 store.insert("a".to_string(), vec![1.0])?;
584 store.insert("b".to_string(), vec![2.0])?;
585 let delta = store.export_delta(0);
586 assert_eq!(delta.changes.len(), 2);
587 assert_eq!(delta.source_seq, 2);
588 Ok(())
589 }
590
591 #[test]
592 fn test_export_delta_incremental() -> Result<()> {
593 let mut store = make_store();
594 store.insert("a".to_string(), vec![1.0])?;
595 store.insert("b".to_string(), vec![2.0])?;
596 let delta = store.export_delta(1); assert_eq!(delta.changes.len(), 1);
598 assert_eq!(delta.changes[0].key, "b");
599 Ok(())
600 }
601
602 #[test]
603 fn test_export_delta_empty_when_up_to_date() -> Result<()> {
604 let mut store = make_store();
605 store.insert("a".to_string(), vec![1.0])?;
606 let delta = store.export_delta(1); assert!(delta.is_empty());
608 Ok(())
609 }
610
611 #[test]
614 fn test_merge_delta_inserts_new_entries() -> Result<()> {
615 let mut source = make_store();
616 source.insert("remote_key".to_string(), vec![42.0])?;
617 let delta = source.export_delta(0);
618
619 let mut target = make_store();
620 let result = target.merge_delta(&delta)?;
621
622 assert_eq!(result.inserts_applied, 1);
623 assert!(target.contains("remote_key"));
624 assert_eq!(
625 target.get("remote_key").expect("test value").vector,
626 vec![42.0]
627 );
628 Ok(())
629 }
630
631 #[test]
632 fn test_merge_delta_deletes_entries() -> Result<()> {
633 let mut target = make_store();
634 target.insert("to_delete".to_string(), vec![1.0])?;
635
636 let delta = StoreDelta {
638 source_seq: 99,
639 since_seq: 0,
640 changes: vec![ChangeRecord::delete(99, "to_delete".to_string())],
641 };
642
643 let result = target.merge_delta(&delta)?;
644 assert_eq!(result.deletes_applied, 1);
645 assert!(!target.contains("to_delete"));
646 Ok(())
647 }
648
649 #[test]
650 fn test_merge_delta_conflict_local_wins() -> Result<()> {
651 let mut target = make_store();
652 for i in 0..5 {
654 target.insert(format!("k{}", i), vec![i as f32])?;
655 }
656
657 let delta = StoreDelta {
659 source_seq: 1,
660 since_seq: 0,
661 changes: vec![ChangeRecord::update(
662 1,
663 "k0".to_string(),
664 vec![999.0],
665 HashMap::new(),
666 )],
667 };
668
669 let result = target.merge_delta(&delta)?;
670 assert_eq!(result.conflicts_skipped, 1);
671 assert_eq!(target.get("k0").expect("test value").vector, vec![0.0]);
673 Ok(())
674 }
675
676 #[test]
677 fn test_merge_delta_remote_wins_newer_seq() -> Result<()> {
678 let mut target = make_store();
679 target.insert("k".to_string(), vec![1.0])?; let delta = StoreDelta {
683 source_seq: 100,
684 since_seq: 0,
685 changes: vec![ChangeRecord::update(
686 100,
687 "k".to_string(),
688 vec![200.0],
689 HashMap::new(),
690 )],
691 };
692
693 let result = target.merge_delta(&delta)?;
694 assert_eq!(result.updates_applied, 1);
695 assert_eq!(target.get("k").expect("test value").vector, vec![200.0]);
696 Ok(())
697 }
698
699 #[test]
700 fn test_merge_empty_delta_noop() -> Result<()> {
701 let mut store = make_store();
702 store.insert("a".to_string(), vec![1.0])?;
703 let delta = StoreDelta {
704 source_seq: 0,
705 since_seq: 0,
706 changes: Vec::new(),
707 };
708 let result = store.merge_delta(&delta)?;
709 assert_eq!(result.total_applied(), 0);
710 assert_eq!(store.len(), 1);
711 Ok(())
712 }
713
714 #[test]
715 fn test_merge_result_total_applied() -> Result<()> {
716 let mut source = make_store();
717 source.insert("a".to_string(), vec![1.0])?;
718 source.insert("b".to_string(), vec![2.0])?;
719 let delta = source.export_delta(0);
720
721 let mut target = make_store();
722 let result = target.merge_delta(&delta)?;
723 assert_eq!(result.total_applied(), 2);
724 Ok(())
725 }
726
727 #[test]
730 fn test_stats_counters() -> Result<()> {
731 let mut store = make_store();
732 store.insert("a".to_string(), vec![1.0])?;
733 store.insert("b".to_string(), vec![2.0])?;
734 store.update("a".to_string(), vec![10.0])?;
735 store.delete("b")?;
736
737 let stats = store.stats();
738 assert_eq!(stats.total_inserts, 2);
739 assert_eq!(stats.total_updates, 1);
740 assert_eq!(stats.total_deletes, 1);
741 assert_eq!(stats.entry_count, 1);
742 assert_eq!(stats.current_seq, 4);
743 Ok(())
744 }
745
746 #[test]
747 fn test_stats_merge_counter() -> Result<()> {
748 let mut source = make_store();
749 source.insert("x".to_string(), vec![1.0])?;
750 let delta = source.export_delta(0);
751
752 let mut target = make_store();
753 target.merge_delta(&delta)?;
754 target.merge_delta(&StoreDelta {
755 source_seq: 0,
756 since_seq: 0,
757 changes: Vec::new(),
758 })?;
759
760 assert_eq!(target.stats().total_merges, 2);
761 Ok(())
762 }
763
764 #[test]
767 fn test_compact_log_reduces_size() -> Result<()> {
768 let mut store = make_store();
769 store.insert("k".to_string(), vec![1.0])?;
770 store.update("k".to_string(), vec![2.0])?;
771 store.update("k".to_string(), vec![3.0])?;
772 assert_eq!(store.stats().log_length, 3);
773 store.compact_log();
774 assert_eq!(store.stats().log_length, 1);
776 Ok(())
777 }
778
779 #[test]
780 fn test_compact_log_preserves_state() -> Result<()> {
781 let mut store = make_store();
782 for i in 0..5 {
783 store.insert(format!("k{}", i), vec![i as f32])?;
784 }
785 store.update("k0".to_string(), vec![99.0])?;
786 store.compact_log();
787 assert_eq!(store.get("k0").expect("test value").vector, vec![99.0]);
788 assert_eq!(store.len(), 5);
789 Ok(())
790 }
791
792 #[test]
795 fn test_keys_returns_all_live_keys() -> Result<()> {
796 let mut store = make_store();
797 store.insert("a".to_string(), vec![1.0])?;
798 store.insert("b".to_string(), vec![2.0])?;
799 store.insert("c".to_string(), vec![3.0])?;
800 store.delete("b")?;
801 let mut keys = store.keys();
802 keys.sort();
803 assert_eq!(keys, vec!["a", "c"]);
804 Ok(())
805 }
806
807 #[test]
810 fn test_insert_with_metadata_stored() -> Result<()> {
811 let mut store = make_store();
812 let mut meta = HashMap::new();
813 meta.insert("source".to_string(), "test".to_string());
814 store.insert_with_metadata("k".to_string(), vec![1.0], meta.clone())?;
815 let entry = store.get("k").expect("k not found");
816 assert_eq!(
817 entry.metadata.get("source").map(String::as_str),
818 Some("test")
819 );
820 Ok(())
821 }
822}