kv/
kv.rs

1use logical_clock::{VersionVector, Dot};
2use std::collections::HashMap;
3
4type Key = String;
5
6#[derive(Clone, Debug)]
7struct Value{
8    val:i64,
9    dot:Dot
10}
11
12struct KVStore {
13    store:HashMap<Key, Vec<Value>>,
14    vv:VersionVector,
15	
16}
17
18impl KVStore {
19    fn new() -> KVStore {
20	KVStore{
21	    store: HashMap::new(),
22	    vv: VersionVector::new(),
23	}
24    }
25
26    fn get(&self, key: &str) -> (Option<Vec<Value>>, VersionVector) {
27	match self.store.get(key) {
28	    None => (None, self.vv.clone()),
29	    Some(v) => (Some(v.clone()), self.vv.clone())
30	}
31    }
32
33    fn set(mut self, client_id:&str, context: &VersionVector, key: &str, val: i64) -> Self{
34	// if incoming request context descends from local clock, just overwrite.
35	if context.descends(&self.vv) {
36	    self.vv = self.vv.inc(client_id);
37	    let dot = self.vv.get_dot(client_id);
38	    let new_obj = Value{val: val, dot: dot};
39	    
40	    // overwrite all the siblings
41	    self.store.insert(key.to_string(), vec![new_obj]);
42	    return self
43	}
44
45	let mut frontier = self.vv.merge(&context);
46	frontier = frontier.inc(client_id);
47	let dot = frontier.get_dot(client_id);
48	let new_obj = Value{val: val, dot: dot};
49	self.vv = frontier;
50	return self.merge_siblings(key, new_obj)
51    }
52
53    fn merge_siblings(mut self, key: &str, new_val: Value) -> Self{
54	// replace values that dominated by given value's dot
55	let (old, _) = self.get(key);
56
57	match old {
58	    None => {
59		self.store.insert(key.to_string(), vec![new_val]);
60		return self
61	    },
62	    Some(values) => {
63		let mut updated = Vec::new();
64		for v in values {
65		    if new_val.dot.descends(&v.dot) {
66			continue;
67		    }
68		    updated.push(v);
69		}
70		updated.push(new_val);
71		self.store.insert(key.to_string(), updated);
72		return self
73	    }
74	}
75    }
76}
77
78fn main() {
79    let mut kv = KVStore::new();
80
81    // always get before put - Semantics followed in any High Available Key value store
82
83    // Client A and Client B
84    let (_, ctx_a) = kv.get("x");
85    let (_, ctx_b) = kv.get("x");
86
87    
88    kv = kv.set("A", &ctx_a, "x", 10); // A try to write x=10 with empty context
89    kv = kv.set("B", &ctx_b, "x", 15); // B try to write x=12 with same empty context
90
91    // both are concurrent from the client views, so both values should be kept
92    assert_eq!(2, kv.store["x"].len());
93
94    // Client C comes in.
95    let (_, ctx_c) = kv.get("x");
96    // now client C knows all the causal contex, so it replaces the key with all causal past.
97    kv = kv.set("C", &ctx_c, "x", 20);
98    assert_eq!(1, kv.store["x"].len());
99    
100    // now B set with old empty context.
101    kv = kv.set("B", &ctx_b, "x", 30); // I know contex is empty just set it as 30.
102
103    // From client views latest B write is concurrent to C latest write. so both should be kept.
104    assert_eq!(2, kv.store["x"].len());
105    
106    for (k, v) in kv.store {
107	println!("key: {}, values: {:?}", k, v)
108	    // val: {}, dot: {:?}", k, v, v.dot);
109    }
110    println!("vv: {:?}", kv.vv);
111
112}