diamond_types/list/
eq.rs

1// This file contains an implementation of Eq / PartialEq for OpLog. The implementation is quite
2// complex because:
3//
4// - Operation logs don't have a canonical ordering (because of bubbles)
5// - Internal agent IDs are arbitrary.
6//
7// This implementation of Eq is mostly designed to help fuzz testing. It is not optimized for
8// performance.
9
10use rle::{HasLength, SplitableSpan};
11use rle::zip::rle_zip3;
12use crate::{ROOT_AGENT, Time};
13use crate::list::OpLog;
14use crate::frontier::clean_version;
15use crate::history::MinimalHistoryEntry;
16use crate::rle::KVPair;
17
18// const VERBOSE: bool = true;
19const VERBOSE: bool = false;
20
21impl PartialEq<Self> for OpLog {
22    fn eq(&self, other: &Self) -> bool {
23        if self.doc_id != other.doc_id { return false; }
24
25        // This implementation is based on the equivalent version in the original diamond types
26        // implementation.
27
28        // Fields to check:
29        // - [x] client_with_localtime, client_data,
30        // - [x] operations (+ ins_content / del_content)
31        // - [x] history
32        // - [x] frontier
33
34        // This check isn't sufficient. We'll check the frontier entries more thoroughly below.
35        if self.version.len() != other.version.len() { return false; }
36
37        // [self.agent] => other.agent.
38        let mut agent_a_to_b = Vec::new();
39        for c in self.client_data.iter() {
40            // If there's no corresponding client in other (and the agent is actually in use), the
41            // oplogs don't match.
42            let other_agent = if let Some(other_agent) = other.get_agent_id(&c.name) {
43                if other.client_data[other_agent as usize].get_next_seq() != c.get_next_seq() {
44                    // Make sure we have exactly the same number of edits for each agent.
45                    return false;
46                }
47
48                other_agent
49            } else {
50                #[allow(clippy::collapsible_else_if)]
51                if c.is_empty() {
52                    ROOT_AGENT // Just using this as a placeholder. Could use None but its awkward.
53                } else {
54                    // Agent missing.
55                    if VERBOSE {
56                        println!("Oplog does not match because agent ID is missing");
57                    }
58                    return false;
59                }
60            };
61            agent_a_to_b.push(other_agent);
62        }
63
64        let map_time_to_other = |t: Time| -> Option<Time> {
65            let mut crdt_id = self.time_to_crdt_id(t);
66            crdt_id.agent = agent_a_to_b[crdt_id.agent as usize];
67            other.try_crdt_id_to_time(crdt_id)
68        };
69
70        // Check frontier contents. Note this is O(n^2) with the size of the respective frontiers.
71        // Which should be fine in normal use, but its a DDOS risk.
72        for t in &self.version {
73            let other_time = map_time_to_other(*t);
74            if let Some(other_time) = other_time {
75                if !other.version.contains(&other_time) {
76                    if VERBOSE { println!("Frontier is not contained by other frontier"); }
77                    return false;
78                }
79            } else {
80                // The time is unknown.
81                if VERBOSE { println!("Frontier is not known in other doc"); }
82                return false;
83            }
84        }
85
86        // The core strategy here is we'll iterate through our local operations and make sure they
87        // each have a corresponding operation in other. Because self.len == other.len, this will be
88        // sufficient.
89
90        // The other approach here would be to go through each agent in self.clients and scan the
91        // corresponding changes in other.
92
93        // Note this should be optimized if its going to be used for more than fuzz testing.
94        // But this is pretty neat!
95        for (mut op, mut txn, mut crdt_id) in rle_zip3(
96            self.iter(),
97            self.iter_history(),
98            self.client_with_localtime.iter().map(|pair| pair.1)
99        ) {
100
101            // println!("op {:?} txn {:?} crdt {:?}", op, txn, crdt_id);
102
103            // Unfortunately the operation range we found might be split up in other. We'll loop
104            // grabbing as much of it as we can at a time.
105            loop {
106                // Look up the corresponding operation in other.
107
108                // This maps via agents - so I think that sort of implicitly checks out.
109                let other_time = if let Some(other_time) = map_time_to_other(txn.span.start) {
110                    other_time
111                } else { return false; };
112
113                // Lets take a look at the operation.
114                let (KVPair(_, other_op_int), offset) = other.operations.find_packed_with_offset(other_time);
115
116                let mut other_op = other_op_int.to_operation(other);
117                if offset > 0 { other_op.truncate_keeping_right(offset); }
118
119                // Although op is contiguous, and all in a run from the same agent, the same isn't
120                // necessarily true of other_op! The max length we can consume here is limited by
121                // other_op's size in agent assignments.
122                let (run, offset) = other.client_with_localtime.find_packed_with_offset(other_time);
123                let mut other_id = run.1;
124                if offset > 0 { other_id.truncate_keeping_right(offset); }
125
126                if agent_a_to_b[crdt_id.agent as usize] != other_id.agent {
127                    if VERBOSE { println!("Ops do not match because agents differ"); }
128                    return false;
129                }
130                if crdt_id.seq_range.start != other_id.seq_range.start {
131                    if VERBOSE { println!("Ops do not match because CRDT sequence numbers differ"); }
132                    return false;
133                }
134
135                let len_here = usize::min(other_op.len(), usize::min(op.len(), other_id.len()));
136                if other_op.len() > len_here {
137                    other_op.truncate(len_here);
138                }
139
140                let remainder = if op.len() > len_here {
141                    Some(op.truncate(len_here))
142                } else { None };
143
144                if op != other_op {
145                    if VERBOSE { println!("Ops do not match at {}:\n{:?}\n{:?}", txn.span.start, op, other_op); }
146                    return false;
147                }
148
149                // Ok, and we also need to check the txns match.
150                let (other_txn_entry, offset) = other.history.entries.find_packed_with_offset(other_time);
151                let mut other_txn: MinimalHistoryEntry = other_txn_entry.clone().into();
152                if offset > 0 { other_txn.truncate_keeping_right(offset); }
153                if other_txn.len() > len_here {
154                    other_txn.truncate(len_here);
155                }
156
157                // We can't just compare txns because the parents need to be mapped!
158                let mapped_start = if let Some(mapped) = map_time_to_other(txn.span.start) {
159                    mapped
160                } else {
161                    panic!("I think this should be unreachable, since we check the agent / seq matches above.");
162                    // return false;
163                };
164
165                let mut mapped_txn = MinimalHistoryEntry {
166                    span: (mapped_start..mapped_start + len_here).into(),
167                    // .unwrap() should be safe here because we've already walked past this item's
168                    // parents.
169                    parents: txn.parents.iter().map(|t| map_time_to_other(*t).unwrap()).collect()
170                };
171                // mapped_txn.parents.sort_unstable();
172                clean_version(&mut mapped_txn.parents);
173
174                if other_txn != mapped_txn {
175                    if VERBOSE { println!("Txns do not match {:?} (was {:?}) != {:?}", mapped_txn, txn, other_txn); }
176                    return false;
177                }
178
179                if let Some(rem) = remainder {
180                    op = rem;
181                } else { break; }
182                crdt_id.seq_range.start += len_here;
183                txn.truncate_keeping_right(len_here);
184            }
185        }
186
187        true
188    }
189}
190
191impl Eq for OpLog {}
192
193
194#[cfg(test)]
195mod test {
196    use crate::list::OpLog;
197
198    fn is_eq(a: &OpLog, b: &OpLog) -> bool {
199        let a_eq_b = a.eq(b);
200        let b_eq_a = b.eq(a);
201        if a_eq_b != b_eq_a { dbg!(a_eq_b, b_eq_a); }
202        assert_eq!(a_eq_b, b_eq_a);
203        a_eq_b
204    }
205
206    #[test]
207    fn eq_smoke_test() {
208        let mut a = OpLog::new();
209        assert!(is_eq(&a, &a));
210        a.get_or_create_agent_id("seph");
211        a.get_or_create_agent_id("mike");
212        a.add_insert_at(0, &[], 0, "Aa");
213        a.add_insert_at(1, &[], 0, "b");
214        a.add_delete_at(0, &[1, 2], 0..2);
215
216        // Same history, different order.
217        let mut b = OpLog::new();
218        b.get_or_create_agent_id("mike");
219        b.get_or_create_agent_id("seph");
220        b.add_insert_at(0, &[], 0, "b");
221        b.add_insert_at(1, &[], 0, "Aa");
222        b.add_delete_at(1, &[0, 2], 0..2);
223
224        assert!(is_eq(&a, &b));
225
226        // And now with the edits interleaved
227        let mut c = OpLog::new();
228        c.get_or_create_agent_id("seph");
229        c.get_or_create_agent_id("mike");
230        c.add_insert_at(0, &[], 0, "A");
231        c.add_insert_at(1, &[], 0, "b");
232        c.add_insert_at(0, &[0], 1, "a");
233        c.add_delete_at(0, &[1, 2], 0..2);
234
235        assert!(is_eq(&a, &c));
236        assert!(is_eq(&b, &c));
237    }
238}