diamond-types 1.0.0

The world's fastest text CRDT
Documentation
use std::collections::BinaryHeap;
use smallvec::SmallVec;
use rle::{AppendRle, HasLength};
use crate::list::OpLog;
use crate::dtrange::DTRange;
use crate::rle::KVPair;
use crate::AgentId;
use crate::frontier::debug_assert_frontier_sorted;
use crate::history::MinimalHistoryEntry;

impl OpLog {
    /// Find all the items to merge from other into self.
    fn to_merge(&self, other: &Self, agent_map: &[AgentId]) -> SmallVec<[DTRange; 4]> {
        // This method is in many ways a baby version of diff_slow, with some changes:
        // - We only look at the frontier. (This is not configurable - but it could be)
        // - It maps spans from other -> self
        // - Rather than having OnlyA / OnlyB items which mutually annihilate each other, this
        //   method simply discards items as soon as we find them in self.

        // Much like diff(), this method could be optimized easily by checking for some common
        // cases. I'm not sure how important is is though, since I doubt this method will be used
        // much.

        let mut queue = BinaryHeap::new();
        // dbg!(&other.frontier, &other.history);
        for ord in &other.version {
            queue.push(*ord);
        }

        let mut result = SmallVec::new();

        while let Some(mut ord) = queue.pop() {
            // let mut ord = ord;
            // dbg!(ord, &queue);

            // Cases:
            // - ord is within self. In that case, discard it.
            // - ord not within self. Find the longest run we can - constrained by other txn and
            //  (agent,seq) pairs. If we find something we know, add to result and end. If not,
            //  add parents to queue.
            let containing_txn = other.history.entries.find_packed(ord);

            // Discard any other entries from queue which name the same txn

            while let Some(peek_ord) = queue.peek() {
                let peek_ord = peek_ord;
                if *peek_ord >= containing_txn.span.start {
                    queue.pop();
                } else {
                    break;
                }
            }

            loop { // Add as much as we can from this txn.
                let (other_span, offset) = other.client_with_localtime.find_packed_with_offset(ord);
                let self_agent = agent_map[other_span.1.agent as usize];
                let seq = other_span.1.seq_range.start + offset;

                // Find out how many items we can eat
                let (r, offset) = self.client_data[self_agent as usize]
                    .item_times.find_sparse(seq);
                if r.is_ok() {
                    // Overlap here. Discard from the queue.
                    break;
                }

                let id_start = ord - offset;
                if containing_txn.span.start >= id_start {
                    // We can take everything from the start of the txn.
                    result.push_reversed_rle((containing_txn.span.start..ord + 1).into());

                    // And push parents.
                    for p in containing_txn.parents.iter() {
                        queue.push(*p);
                    }

                    break;
                } else {
                    // Take back to id_start and iterate.
                    result.push_reversed_rle((id_start..ord + 1).into());
                    ord = id_start - 1;
                }
            }
        }

        result
    }

    /// Add all missing operations from the other oplog into this oplog. This method is mostly used
    /// by testing code, since you rarely have two local oplogs to merge together.
    pub fn add_missing_operations_from(&mut self, other: &Self) {
        // [other.agent] => self.agent
        let mut agent_map = Vec::with_capacity(other.client_data.len());

        // TODO: Construct this lazily.
        for c in other.client_data.iter() {
            let self_agent = self.get_or_create_agent_id(c.name.as_str());
            agent_map.push(self_agent);
        }

        // So we need to figure out which changes in other *aren't* in self. To do that, I'll walk
        // backwards through other, looking for changes which are missing in self.

        let spans = self.to_merge(other, &agent_map);
        // dbg!(&spans);

        let mut time = self.len();
        for &s in spans.iter().rev() {
            // Operations
            let mut t = time;
            for (KVPair(_, op), content) in other.iter_range_simple(s) {
                // Operations don't need to be mapped at all.
                // dbg!(&op, content);
                self.push_op_internal(t, op.loc, op.kind, content);
                t += op.len();
            }

            // Agent assignments
            t = time;
            for mut span in other.iter_mappings_range(s) {
                // Map other agent ID -> self agent IDs.
                span.agent = agent_map[span.agent as usize];
                self.assign_time_to_crdt_span(t, span);
                t += span.len();
            }

            // History entries (parents)
            t = time;
            for mut hist_entry in other.history.entries
                .iter_range_map_packed(s, |e| MinimalHistoryEntry::from(e)) {

                let len = hist_entry.len();
                let span = (t..t + len).into();
                // We need to convert other parents to self parents. This is a bit gross but eh.
                // dbg!(&hist_entry.parents);
                for t in &mut hist_entry.parents {
                    let mut id = other.time_to_crdt_id(*t);
                    id.agent = agent_map[id.agent as usize];
                    let self_time = self.crdt_id_to_time(id);
                    *t = self_time;
                }

                hist_entry.parents.sort_unstable();
                // hist_entry.parents.sort_unstable_by(|a, b| a.cmp(b));
                debug_assert_frontier_sorted(&hist_entry.parents);
                // dbg!(&hist_entry.parents);

                self.history.insert(&hist_entry.parents, span);
                self.advance_frontier(&hist_entry.parents, span);
                t += len;
            }

            time += s.len();
        }
    }
}

#[cfg(test)]
mod test {
    use crate::list::OpLog;

    fn merge_into_and_check(dest: &mut OpLog, src: &OpLog) {
        // dbg!(&dest);
        dest.add_missing_operations_from(&src);
        dest.dbg_check(true);
        // dbg!(&dest);
        assert_eq!(dest, src);
    }

    fn merge_both_and_check(a: &mut OpLog, b: &mut OpLog) {
        // dbg!(&dest);
        a.add_missing_operations_from(&b);
        // dbg!(&a);
        a.dbg_check(true);

        b.add_missing_operations_from(&a);
        b.dbg_check(true);
        // dbg!(&dest);

        dbg!(&a, &b);
        assert_eq!(a, b);
    }

    #[test]
    fn smoke() {
        let mut a = OpLog::new();
        let mut b = OpLog::new();
        assert_eq!(a, b);
        // merge_and_check(&mut a, &b);

        a.get_or_create_agent_id("seph");
        a.add_insert(0, 0, "hi");
        merge_into_and_check(&mut b, &a);

        // Ok now we'll append data to both oplogs
        a.add_insert(0, 0, "aaa");
        b.get_or_create_agent_id("mike");
        b.add_delete_without_content(1, 0..2);

        merge_both_and_check(&mut a, &mut b);
    }
}