diamond-types 1.0.0

The world's fastest text CRDT
Documentation
// This file contains an implementation of Eq / PartialEq for OpLog. The implementation is quite
// complex because:
//
// - Operation logs don't have a canonical ordering (because of bubbles)
// - Internal agent IDs are arbitrary.
//
// This implementation of Eq is mostly designed to help fuzz testing. It is not optimized for
// performance.

use rle::{HasLength, SplitableSpan};
use rle::zip::rle_zip3;
use crate::{ROOT_AGENT, Time};
use crate::list::OpLog;
use crate::frontier::clean_version;
use crate::history::MinimalHistoryEntry;
use crate::rle::KVPair;

// const VERBOSE: bool = true;
const VERBOSE: bool = false;

impl PartialEq<Self> for OpLog {
    fn eq(&self, other: &Self) -> bool {
        if self.doc_id != other.doc_id { return false; }

        // This implementation is based on the equivalent version in the original diamond types
        // implementation.

        // Fields to check:
        // - [x] client_with_localtime, client_data,
        // - [x] operations (+ ins_content / del_content)
        // - [x] history
        // - [x] frontier

        // This check isn't sufficient. We'll check the frontier entries more thoroughly below.
        if self.version.len() != other.version.len() { return false; }

        // [self.agent] => other.agent.
        let mut agent_a_to_b = Vec::new();
        for c in self.client_data.iter() {
            // If there's no corresponding client in other (and the agent is actually in use), the
            // oplogs don't match.
            let other_agent = if let Some(other_agent) = other.get_agent_id(&c.name) {
                if other.client_data[other_agent as usize].get_next_seq() != c.get_next_seq() {
                    // Make sure we have exactly the same number of edits for each agent.
                    return false;
                }

                other_agent
            } else {
                #[allow(clippy::collapsible_else_if)]
                if c.is_empty() {
                    ROOT_AGENT // Just using this as a placeholder. Could use None but its awkward.
                } else {
                    // Agent missing.
                    if VERBOSE {
                        println!("Oplog does not match because agent ID is missing");
                    }
                    return false;
                }
            };
            agent_a_to_b.push(other_agent);
        }

        let map_time_to_other = |t: Time| -> Option<Time> {
            let mut crdt_id = self.time_to_crdt_id(t);
            crdt_id.agent = agent_a_to_b[crdt_id.agent as usize];
            other.try_crdt_id_to_time(crdt_id)
        };

        // Check frontier contents. Note this is O(n^2) with the size of the respective frontiers.
        // Which should be fine in normal use, but its a DDOS risk.
        for t in &self.version {
            let other_time = map_time_to_other(*t);
            if let Some(other_time) = other_time {
                if !other.version.contains(&other_time) {
                    if VERBOSE { println!("Frontier is not contained by other frontier"); }
                    return false;
                }
            } else {
                // The time is unknown.
                if VERBOSE { println!("Frontier is not known in other doc"); }
                return false;
            }
        }

        // The core strategy here is we'll iterate through our local operations and make sure they
        // each have a corresponding operation in other. Because self.len == other.len, this will be
        // sufficient.

        // The other approach here would be to go through each agent in self.clients and scan the
        // corresponding changes in other.

        // Note this should be optimized if its going to be used for more than fuzz testing.
        // But this is pretty neat!
        for (mut op, mut txn, mut crdt_id) in rle_zip3(
            self.iter(),
            self.iter_history(),
            self.client_with_localtime.iter().map(|pair| pair.1)
        ) {

            // println!("op {:?} txn {:?} crdt {:?}", op, txn, crdt_id);

            // Unfortunately the operation range we found might be split up in other. We'll loop
            // grabbing as much of it as we can at a time.
            loop {
                // Look up the corresponding operation in other.

                // This maps via agents - so I think that sort of implicitly checks out.
                let other_time = if let Some(other_time) = map_time_to_other(txn.span.start) {
                    other_time
                } else { return false; };

                // Lets take a look at the operation.
                let (KVPair(_, other_op_int), offset) = other.operations.find_packed_with_offset(other_time);

                let mut other_op = other_op_int.to_operation(other);
                if offset > 0 { other_op.truncate_keeping_right(offset); }

                // Although op is contiguous, and all in a run from the same agent, the same isn't
                // necessarily true of other_op! The max length we can consume here is limited by
                // other_op's size in agent assignments.
                let (run, offset) = other.client_with_localtime.find_packed_with_offset(other_time);
                let mut other_id = run.1;
                if offset > 0 { other_id.truncate_keeping_right(offset); }

                if agent_a_to_b[crdt_id.agent as usize] != other_id.agent {
                    if VERBOSE { println!("Ops do not match because agents differ"); }
                    return false;
                }
                if crdt_id.seq_range.start != other_id.seq_range.start {
                    if VERBOSE { println!("Ops do not match because CRDT sequence numbers differ"); }
                    return false;
                }

                let len_here = usize::min(other_op.len(), usize::min(op.len(), other_id.len()));
                if other_op.len() > len_here {
                    other_op.truncate(len_here);
                }

                let remainder = if op.len() > len_here {
                    Some(op.truncate(len_here))
                } else { None };

                if op != other_op {
                    if VERBOSE { println!("Ops do not match at {}:\n{:?}\n{:?}", txn.span.start, op, other_op); }
                    return false;
                }

                // Ok, and we also need to check the txns match.
                let (other_txn_entry, offset) = other.history.entries.find_packed_with_offset(other_time);
                let mut other_txn: MinimalHistoryEntry = other_txn_entry.clone().into();
                if offset > 0 { other_txn.truncate_keeping_right(offset); }
                if other_txn.len() > len_here {
                    other_txn.truncate(len_here);
                }

                // We can't just compare txns because the parents need to be mapped!
                let mapped_start = if let Some(mapped) = map_time_to_other(txn.span.start) {
                    mapped
                } else {
                    panic!("I think this should be unreachable, since we check the agent / seq matches above.");
                    // return false;
                };

                let mut mapped_txn = MinimalHistoryEntry {
                    span: (mapped_start..mapped_start + len_here).into(),
                    // .unwrap() should be safe here because we've already walked past this item's
                    // parents.
                    parents: txn.parents.iter().map(|t| map_time_to_other(*t).unwrap()).collect()
                };
                // mapped_txn.parents.sort_unstable();
                clean_version(&mut mapped_txn.parents);

                if other_txn != mapped_txn {
                    if VERBOSE { println!("Txns do not match {:?} (was {:?}) != {:?}", mapped_txn, txn, other_txn); }
                    return false;
                }

                if let Some(rem) = remainder {
                    op = rem;
                } else { break; }
                crdt_id.seq_range.start += len_here;
                txn.truncate_keeping_right(len_here);
            }
        }

        true
    }
}

impl Eq for OpLog {}


#[cfg(test)]
mod test {
    use crate::list::OpLog;

    fn is_eq(a: &OpLog, b: &OpLog) -> bool {
        let a_eq_b = a.eq(b);
        let b_eq_a = b.eq(a);
        if a_eq_b != b_eq_a { dbg!(a_eq_b, b_eq_a); }
        assert_eq!(a_eq_b, b_eq_a);
        a_eq_b
    }

    #[test]
    fn eq_smoke_test() {
        let mut a = OpLog::new();
        assert!(is_eq(&a, &a));
        a.get_or_create_agent_id("seph");
        a.get_or_create_agent_id("mike");
        a.add_insert_at(0, &[], 0, "Aa");
        a.add_insert_at(1, &[], 0, "b");
        a.add_delete_at(0, &[1, 2], 0..2);

        // Same history, different order.
        let mut b = OpLog::new();
        b.get_or_create_agent_id("mike");
        b.get_or_create_agent_id("seph");
        b.add_insert_at(0, &[], 0, "b");
        b.add_insert_at(1, &[], 0, "Aa");
        b.add_delete_at(1, &[0, 2], 0..2);

        assert!(is_eq(&a, &b));

        // And now with the edits interleaved
        let mut c = OpLog::new();
        c.get_or_create_agent_id("seph");
        c.get_or_create_agent_id("mike");
        c.add_insert_at(0, &[], 0, "A");
        c.add_insert_at(1, &[], 0, "b");
        c.add_insert_at(0, &[0], 1, "a");
        c.add_delete_at(0, &[1, 2], 0..2);

        assert!(is_eq(&a, &c));
        assert!(is_eq(&b, &c));
    }
}