yrs/
state_vector.rs

1use crate::block::ClientID;
2use crate::encoding::read::Error;
3use crate::updates::decoder::{Decode, Decoder};
4use crate::updates::encoder::{Encode, Encoder};
5use crate::utils::client_hasher::ClientHasher;
6use crate::{DeleteSet, ID};
7use std::cmp::Ordering;
8use std::collections::hash_map::Entry;
9use std::collections::HashMap;
10use std::hash::BuildHasherDefault;
11use std::iter::FromIterator;
12
13/// State vector is a compact representation of all known blocks inserted and integrated into
14/// a given document. This descriptor can be serialized and used to determine a difference between
15/// seen and unseen inserts of two replicas of the same document, potentially existing in different
16/// processes.
17///
18/// Another popular name for the concept represented by state vector is
19/// [Version Vector](https://en.wikipedia.org/wiki/Version_vector).
20#[derive(Default, Debug, Clone, PartialEq, Eq)]
21pub struct StateVector(HashMap<ClientID, u32, BuildHasherDefault<ClientHasher>>);
22
23impl StateVector {
24    /// Checks if current state vector contains any data.
25    pub fn is_empty(&self) -> bool {
26        self.0.is_empty()
27    }
28
29    /// Returns a number of unique clients observed by a document, current state vector corresponds
30    /// to.
31    pub fn len(&self) -> usize {
32        self.0.len()
33    }
34
35    pub fn new(map: HashMap<ClientID, u32, BuildHasherDefault<ClientHasher>>) -> Self {
36        StateVector(map)
37    }
38
39    /// Checks if current state vector includes given block identifier. Blocks, which identifiers
40    /// can be found in a state vectors don't need to be encoded as part of an update, because they
41    /// were already observed by their remote peer, current state vector refers to.
42    pub fn contains(&self, id: &ID) -> bool {
43        id.clock <= self.get(&id.client)
44    }
45
46    pub fn contains_client(&self, client_id: &ClientID) -> bool {
47        self.0.contains_key(client_id)
48    }
49
50    /// Get the latest clock sequence number value for a given `client_id` as observed from
51    /// the perspective of a current state vector.
52    pub fn get(&self, client_id: &ClientID) -> u32 {
53        match self.0.get(client_id) {
54            Some(state) => *state,
55            None => 0,
56        }
57    }
58
59    /// Updates a state vector observed clock sequence number for a given `client` by incrementing
60    /// it by a given `delta`.
61    pub fn inc_by(&mut self, client: ClientID, delta: u32) {
62        if delta > 0 {
63            let e = self.0.entry(client).or_default();
64            *e = *e + delta;
65        }
66    }
67
68    /// Updates a state vector observed clock sequence number for a given `client` by setting it to
69    /// a minimum value between an already present one and the provided `clock`. In case if state
70    /// vector didn't contain any value for that `client`, a `clock` value will be used.
71    pub fn set_min(&mut self, client: ClientID, clock: u32) {
72        match self.0.entry(client) {
73            Entry::Occupied(e) => {
74                let value = e.into_mut();
75                *value = (*value).min(clock);
76            }
77            Entry::Vacant(e) => {
78                e.insert(clock);
79            }
80        }
81    }
82
83    /// Updates a state vector observed clock sequence number for a given `client` by setting it to
84    /// a maximum value between an already present one and the provided `clock`. In case if state
85    /// vector didn't contain any value for that `client`, a `clock` value will be used.
86    pub fn set_max(&mut self, client: ClientID, clock: u32) {
87        let e = self.0.entry(client).or_default();
88        *e = (*e).max(clock);
89    }
90
91    /// Returns an iterator which enables to traverse over all clients and their known clock values
92    /// described by a current state vector.
93    pub fn iter(&self) -> std::collections::hash_map::Iter<ClientID, u32> {
94        self.0.iter()
95    }
96
97    /// Merges another state vector into a current one. Since vector's clock values can only be
98    /// incremented, whenever a conflict between two states happen (both state vectors have
99    /// different clock values for the same client entry), a highest of these to is considered to
100    /// be the most up-to-date.
101    pub fn merge(&mut self, other: Self) {
102        for (client, clock) in other.0 {
103            let e = self.0.entry(client).or_default();
104            *e = (*e).max(clock);
105        }
106    }
107}
108
109impl FromIterator<(ClientID, u32)> for StateVector {
110    fn from_iter<T: IntoIterator<Item = (ClientID, u32)>>(iter: T) -> Self {
111        StateVector::new(
112            HashMap::<ClientID, u32, BuildHasherDefault<ClientHasher>>::from_iter(iter),
113        )
114    }
115}
116
117impl Decode for StateVector {
118    fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, Error> {
119        let len = decoder.read_var::<u32>()? as usize;
120        let mut sv = HashMap::with_capacity_and_hasher(len, BuildHasherDefault::default());
121        let mut i = 0;
122        while i < len {
123            let client = decoder.read_var()?;
124            let clock = decoder.read_var()?;
125            sv.insert(client, clock);
126            i += 1;
127        }
128        Ok(StateVector(sv))
129    }
130}
131
132impl Encode for StateVector {
133    fn encode<E: Encoder>(&self, encoder: &mut E) {
134        encoder.write_var(self.len());
135        for (&client, &clock) in self.iter() {
136            encoder.write_var(client);
137            encoder.write_var(clock);
138        }
139    }
140}
141
142impl PartialOrd for StateVector {
143    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
144        let mut result = Some(Ordering::Equal);
145
146        for (client, clock) in self.iter() {
147            let other_clock = other.get(client);
148            match clock.cmp(&other_clock) {
149                Ordering::Less if result == Some(Ordering::Greater) => return None,
150                Ordering::Greater if result == Some(Ordering::Less) => return None,
151                Ordering::Equal => { /* unchanged */ }
152                other => result = Some(other),
153            }
154        }
155
156        for (other_client, other_clock) in other.iter() {
157            let clock = self.get(other_client);
158            match clock.cmp(&other_clock) {
159                Ordering::Less if result == Some(Ordering::Greater) => return None,
160                Ordering::Greater if result == Some(Ordering::Less) => return None,
161                Ordering::Equal => { /* unchanged */ }
162                other => result = Some(other),
163            }
164        }
165
166        result
167    }
168}
169
170/// Snapshot describes a state of a document store at a given point in (logical) time. In practice
171/// it's a combination of [StateVector] (a summary of all observed insert/update operations)
172/// and a [DeleteSet] (a summary of all observed deletions).
173#[derive(Default, Debug, Clone, PartialEq, Eq)]
174pub struct Snapshot {
175    /// Compressed information about all deleted blocks at current snapshot time.
176    pub delete_set: DeleteSet,
177    /// Logical clock describing a current snapshot time.
178    pub state_map: StateVector,
179}
180
181impl Snapshot {
182    pub fn new(state_map: StateVector, delete_set: DeleteSet) -> Self {
183        Snapshot {
184            state_map,
185            delete_set,
186        }
187    }
188
189    pub(crate) fn is_visible(&self, id: &ID) -> bool {
190        self.state_map.get(&id.client) > id.clock && !self.delete_set.is_deleted(id)
191    }
192}
193
194impl Encode for Snapshot {
195    fn encode<E: Encoder>(&self, encoder: &mut E) {
196        self.delete_set.encode(encoder);
197        self.state_map.encode(encoder)
198    }
199}
200
201impl Decode for Snapshot {
202    fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, Error> {
203        let ds = DeleteSet::decode(decoder)?;
204        let sm = StateVector::decode(decoder)?;
205        Ok(Snapshot::new(sm, ds))
206    }
207}
208
209#[cfg(test)]
210mod test {
211    use crate::{Doc, ReadTxn, StateVector, Text, Transact, WriteTxn};
212    use std::cmp::Ordering;
213    use std::iter::FromIterator;
214
215    #[test]
216    fn ordering() {
217        fn s(a: u32, b: u32, c: u32) -> StateVector {
218            StateVector::from_iter([(1, a), (2, b), (3, c)])
219        }
220
221        assert_eq!(s(1, 2, 3).partial_cmp(&s(1, 2, 3)), Some(Ordering::Equal));
222        assert_eq!(s(1, 2, 2).partial_cmp(&s(1, 2, 3)), Some(Ordering::Less));
223        assert_eq!(s(2, 2, 3).partial_cmp(&s(1, 2, 3)), Some(Ordering::Greater));
224        assert_eq!(s(3, 2, 1).partial_cmp(&s(1, 2, 3)), None);
225    }
226
227    #[test]
228    fn ordering_missing_fields() {
229        let a = StateVector::from_iter([(1, 1), (2, 2)]);
230        let b = StateVector::from_iter([(2, 1), (3, 2)]);
231        assert_eq!(a.partial_cmp(&b), None);
232
233        let a = StateVector::from_iter([(1, 1), (2, 2)]);
234        let b = StateVector::from_iter([(1, 1), (2, 1), (3, 2)]);
235        assert_eq!(a.partial_cmp(&b), None);
236
237        let a = StateVector::from_iter([(1, 1), (2, 2), (3, 3)]);
238        let b = StateVector::from_iter([(2, 2), (3, 3)]);
239        assert_eq!(a.partial_cmp(&b), Some(Ordering::Greater));
240
241        let a = StateVector::from_iter([(2, 2), (3, 2)]);
242        let b = StateVector::from_iter([(1, 1), (2, 2), (3, 2)]);
243        assert_eq!(a.partial_cmp(&b), Some(Ordering::Less));
244
245        let a = StateVector::default();
246        let b = StateVector::default();
247        assert_eq!(a.partial_cmp(&b), Some(Ordering::Equal));
248    }
249
250    #[test]
251    fn ordering_one_of() {
252        let doc = Doc::with_client_id(1);
253        let mut txn = doc.transact_mut();
254        let txt = txn.get_or_insert_text("text");
255        txt.insert(&mut txn, 0, "a");
256
257        let a = txn.state_vector();
258        let b = StateVector::default();
259        assert_eq!(a.partial_cmp(&b), Some(Ordering::Greater));
260    }
261}