correlationvector_rust/
correlationvector.rs

1use std::{
2    convert::TryFrom,
3    fmt::{Display, Formatter},
4    num::ParseIntError,
5    time::SystemTime,
6};
7
8use uuid::Uuid;
9
10use crate::{
11    correlationvectorparsererror::CorrelationVectorParseError,
12    spinparams::{generate_entropy, tick_periodicity_bits, ticks_to_drop, SpinParams},
13};
14
15const TERMINATION_SYMBOL: &str = "!";
16
17/// The Correlation Vector struct
18#[derive(Eq, PartialEq, Debug)]
19pub struct CorrelationVector {
20    base: String,
21    vector: Vec<u32>,
22    immutable: bool,
23    serialized_length: usize,
24}
25
26impl CorrelationVector {
27    /// Creates a new CorrelationVector with a randomly generated UUID.
28    pub fn new() -> CorrelationVector {
29        Self::new_from_uuid(Uuid::new_v4())
30    }
31
32    /// Create a new CorrelationVector from a given UUID.
33    pub fn new_from_uuid(base: Uuid) -> CorrelationVector {
34        let mut base_string = base64::encode(base.as_bytes());
35        while let Some(c) = base_string.pop() {
36            if c != '=' {
37                base_string.push(c);
38                break;
39            }
40        }
41        base_string.shrink_to_fit();
42        let base_str_len = base_string.len();
43        CorrelationVector {
44            base: base_string,
45            vector: vec![0],
46            immutable: false,
47            serialized_length: base_str_len + 2,
48        }
49    }
50
51    /// Create a new CorrelationVector struct from a string representation of a CorrelationVector.
52    pub fn parse(input: &str) -> Result<CorrelationVector, CorrelationVectorParseError> {
53        if input.len() > 128 || (input.len() == 128 && !input.ends_with(TERMINATION_SYMBOL)) {
54            return Err(CorrelationVectorParseError::StringTooLongError);
55        }
56
57        let mut input = input;
58        if input.ends_with(TERMINATION_SYMBOL) {
59            input = input.trim_end_matches(TERMINATION_SYMBOL);
60        }
61
62        let parts = input
63            .split('.')
64            .collect::<Vec<&str>>();
65        match *parts.as_slice() {
66            [base, _first, ..] => Ok(CorrelationVector {
67                base: base.to_string(),
68                vector: parts[1..]
69                    .iter()
70                    .map(|s| s.parse::<u32>())
71                    .collect::<Result<Vec<u32>, ParseIntError>>()?,
72                immutable: input.ends_with(TERMINATION_SYMBOL),
73                serialized_length: input.len(),
74            }),
75            [_] => Err(CorrelationVectorParseError::MissingVector),
76            [] => Err(CorrelationVectorParseError::Empty),
77        }
78    }
79
80    /// Append a new clock to the end of the vector clock
81    pub fn extend(&mut self) {
82        if self.immutable {
83            return;
84        }
85        let proposed_len = self.serialized_length + 2;
86        if proposed_len > 127 {
87            self.immutable = true;
88            return;
89        }
90        self.vector.push(0);
91        self.serialized_length = proposed_len; // .0
92    }
93
94    /// Increment the latest clock in the vector clock
95    pub fn increment(&mut self) {
96        if self.immutable {
97            return;
98        }
99        let last_index = self.vector.len() - 1;
100        let prev = self.vector[last_index];
101
102        // if the last digit is 9, the serialized length will increase
103        if prev % 10 == 9 {
104            if self.serialized_length < 127 {
105                self.serialized_length += 1;
106            } else {
107                self.immutable = true;
108            }
109        }
110
111        if !self.immutable {
112            self.vector[last_index] = prev + 1;
113        }
114    }
115
116    /// Transform the vector clock in a unique, monotonically increasing way. 
117    /// This is mostly used in situations where increment can not guaranatee uniqueness
118    pub fn spin(&mut self, params: SpinParams) {
119        if self.immutable {
120            return;
121        }
122        let entropy = generate_entropy(params.spin_entropy);
123        let ticks = SystemTime::now()
124            .duration_since(SystemTime::UNIX_EPOCH)
125            .expect("Time is before the 0 epoch")
126            .as_nanos()
127            / 100;
128
129        let mut value = u64::try_from(ticks >> ticks_to_drop(params.spin_counter_interval))
130            .expect("Number of ticks did not fit in u64");
131
132        for byte in entropy {
133            value = (value << 8) | u64::from(byte);
134        }
135
136        let tick_bitmask_bits = tick_periodicity_bits(params);
137        let mask = if tick_bitmask_bits == 64 {
138            0
139        } else {
140            (1 << tick_bitmask_bits) - 1
141        };
142
143        value &= mask;
144
145        let first_32_bits = value as u32;
146        let proposed_extension_len = serialized_length_of(first_32_bits) + 1;
147        if self.serialized_length + proposed_extension_len > 127 {
148            self.immutable = true;
149            return;
150        }
151        self.serialized_length += proposed_extension_len;
152        self.vector.push(first_32_bits);
153        if tick_bitmask_bits > 32 {
154            let end_32_bits = (value >> 32) as u32;
155            let proposed_extension_len = serialized_length_of(end_32_bits) + 1;
156            if self.serialized_length + proposed_extension_len > 127 {
157                self.immutable = true;
158                return;
159            }
160            self.vector.push(end_32_bits);
161            self.serialized_length += proposed_extension_len;
162        }
163
164        if self.serialized_length + 2 > 127 {
165            self.immutable = true;
166            return;
167        }
168
169        self.vector.push(0);
170        self.serialized_length += 2;
171    }
172}
173
174fn serialized_length_of(input: u32) -> usize {
175    let mut length = 1;
176    let mut input = input;
177    while input > 10 {
178        length += 1;
179        input /= 10;
180    }
181    length
182}
183
184impl Default for CorrelationVector {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190impl Display for CorrelationVector {
191    fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
192        let vector_string: String = self
193            .vector
194            .iter()
195            .map(|i| i.to_string())
196            .collect::<Vec<String>>()
197            .join(".");
198        write!(
199            f,
200            "{}.{}{}",
201            self.base,
202            vector_string,
203            if self.immutable { "!" } else { "" }
204        )
205    }
206}
207
208#[cfg(test)]
209mod tests {
210
211    use crate::spinparams::{SpinCounterInterval, SpinCounterPeriodicity, SpinEntropy};
212
213    use super::*;
214
215    #[test]
216    fn generate_cv() {
217        let cv = CorrelationVector::new();
218        let cv_string = cv.to_string();
219        assert_eq!(cv_string.split('.').count(), 2);
220    }
221
222    #[test]
223    fn parse_cv_works() {
224        let cv = CorrelationVector::new();
225        let cv_string = cv.to_string();
226        let cv_parsed = CorrelationVector::parse(&cv_string);
227        assert_eq!(cv, cv_parsed.expect("Failed to parse cV"));
228    }
229
230    #[test]
231    fn increment_cv() {
232        let mut cv = CorrelationVector::new();
233        cv.increment();
234        let cv_string = cv.to_string();
235        assert!(cv_string.ends_with('1'));
236    }
237
238    #[test]
239    fn extend_cv() {
240        let mut cv = CorrelationVector::new();
241        cv.extend();
242        let cv_string = cv.to_string();
243        assert_eq!(cv_string.split('.').count(), 3);
244    }
245
246    #[test]
247    fn spin_cv() {
248        let mut cv = CorrelationVector::new();
249        cv.spin(SpinParams {
250            spin_entropy: SpinEntropy::Two,
251            spin_counter_interval: SpinCounterInterval::Fine,
252            spin_counter_periodicity: SpinCounterPeriodicity::Short,
253        });
254
255        let cv_string = cv.to_string();
256        assert!(cv_string.ends_with('0'));
257    }
258
259    #[test]
260    fn extend_stops_when_oversize() {
261        let mut cv = CorrelationVector::new();
262        for _ in 0..128 {
263            cv.extend();
264        }
265        let cv_string = cv.to_string();
266        assert!(cv_string.len() <= 128);
267        assert!(cv_string.ends_with(TERMINATION_SYMBOL));
268    }
269
270    #[test]
271    fn spin_stops_when_oversize() {
272        let mut cv = CorrelationVector::new();
273        for _ in 0..128 {
274            cv.spin(SpinParams {
275                spin_entropy: SpinEntropy::Two,
276                spin_counter_interval: SpinCounterInterval::Fine,
277                spin_counter_periodicity: SpinCounterPeriodicity::Short,
278            });
279        }
280        let cv_string = cv.to_string();
281        assert!(cv_string.len() <= 128, "{}", cv_string.len());
282        assert!(cv_string.ends_with(TERMINATION_SYMBOL));
283        println!("{}", cv_string);
284    }
285
286    #[test]
287    fn increment_stops_when_oversize() {
288        let mut cv = CorrelationVector::parse(
289            "P9v1ltK2S7qTS77z0lWtKg.0.386394219.0.386383989.0.386344389.0.386372594.0.386391233.0.386360320.0\
290            .386386342.0.386341105.12344459"
291        ).unwrap();
292
293        cv.increment();
294
295        let cv_string = cv.to_string();
296        assert_eq!(cv_string.len(), 128);
297        assert!(cv_string.ends_with(TERMINATION_SYMBOL));
298    }
299
300    #[test]
301    fn parse_terminated() {
302        let res = CorrelationVector::parse("base.0!");
303        assert!(res.is_ok(), "{:?}", res);
304    }
305}