ron_crdt/
heap.rs

1//! Frame Heap
2
3use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::fmt::Debug;
6
7use crate::{Frame, Op, Terminator};
8
9/// Helper trait for configuring `Heap`s order.
10pub trait FrameOrd<'a>: From<Frame<'a>> + Iterator<Item = Op> {
11    /// Return the first Op in the Frame.
12    fn peek<'b>(&'b self) -> Option<&'b Op>;
13    /// Compare `a` and `b` using the primary comparison function.
14    fn primary_cmp(a: &Op, b: &Op) -> Ordering;
15    /// Compare `a` and `b` using the secondary comparison function.
16    fn secondary_cmp(a: &Op, b: &Op) -> Ordering;
17}
18
19#[derive(Debug)]
20struct Wrapper<T> {
21    inner: T,
22}
23
24impl<'a, T> PartialOrd for Wrapper<T>
25where
26    T: FrameOrd<'a>,
27{
28    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
29        Some(self.cmp(other))
30    }
31}
32
33impl<'a, T> Ord for Wrapper<T>
34where
35    T: FrameOrd<'a>,
36{
37    fn cmp(&self, other: &Self) -> Ordering {
38        match (self.inner.peek(), other.inner.peek()) {
39            (None, None) => Ordering::Equal,
40            (None, Some(_)) => Ordering::Less,
41            (Some(_), None) => Ordering::Greater,
42            (Some(a), Some(b)) => {
43                let ret = T::primary_cmp(a, b);
44                if ret == Ordering::Equal {
45                    T::secondary_cmp(a, b)
46                } else {
47                    ret
48                }
49            }
50        }
51        .reverse()
52    }
53}
54
55impl<'a, T> PartialEq for Wrapper<T>
56where
57    T: FrameOrd<'a>,
58{
59    fn eq(&self, other: &Self) -> bool {
60        self.cmp(other) == Ordering::Equal
61    }
62}
63
64impl<'a, T> Eq for Wrapper<T> where T: FrameOrd<'a> {}
65
66/// Am Iterator heap of Frames.
67pub struct Heap<T> {
68    heap: BinaryHeap<Wrapper<T>>,
69    top: Option<Op>,
70}
71
72impl<'a, T> Heap<T>
73where
74    T: FrameOrd<'a> + Debug,
75{
76    /// Create a new Heap from `batch`.
77    pub fn new(batch: Vec<Frame<'a>>) -> Self {
78        let mut heap = BinaryHeap::default();
79
80        for mut f in batch {
81            loop {
82                match f.peek().cloned() {
83                    Some(Op { term: Terminator::Header, .. })
84                    | Some(Op { term: Terminator::Query, .. }) => {
85                        f.next();
86                    }
87                    Some(_) => {
88                        heap.push(Wrapper { inner: f.into() });
89                        break;
90                    }
91                    None => {
92                        break;
93                    }
94                }
95            }
96        }
97
98        let mut ret = Heap { heap: heap, top: None };
99        let top = ret.advance();
100        ret.top = top;
101        ret
102    }
103
104    /// Returns the smallest of all initial Ops of all Frames.
105    pub fn top<'b>(&'b self) -> Option<&'b Op> {
106        self.top.as_ref()
107    }
108
109    /// Returns and removes the smallest of all initial Ops of all Frames.
110    pub fn pop(&mut self) -> Option<Op> {
111        use std::mem;
112
113        if self.top.is_some() {
114            let mut ret = None;
115
116            mem::swap(&mut ret, &mut self.top);
117            self.top = self.advance();
118            ret
119        } else {
120            None
121        }
122    }
123
124    fn advance(&mut self) -> Option<Op> {
125        loop {
126            match self.heap.pop() {
127                Some(mut frm) => {
128                    match frm.inner.peek().cloned() {
129                        Some(Op { term: Terminator::Header, .. })
130                        | Some(Op { term: Terminator::Query, .. }) => {
131                            frm.inner.next();
132                            self.heap.push(frm);
133                            /* continue */
134                        }
135                        Some(op) => {
136                            self.heap.push(frm);
137                            self.drain(&op);
138                            return Some(op);
139                        }
140                        None => {}
141                    }
142                }
143                None => {
144                    return None;
145                }
146            }
147        }
148    }
149
150    fn drain(&mut self, op: &Op) {
151        let mut frames = Vec::with_capacity(self.heap.len());
152
153        while let Some(mut frm) = self.heap.pop() {
154            loop {
155                match frm.inner.peek().cloned() {
156                    Some(ref p) if T::primary_cmp(p, op) == Ordering::Equal => {
157                        frm.inner.next();
158                    }
159                    _ => {
160                        break;
161                    }
162                }
163            }
164            frames.push(frm);
165        }
166
167        self.heap.extend(frames);
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use uuid::UUID;
175
176    use crate::Heap;
177
178    #[derive(Debug)]
179    struct HeapOrd<'a>(Frame<'a>);
180
181    impl<'a> FrameOrd<'a> for HeapOrd<'a> {
182        fn primary_cmp(a: &Op, b: &Op) -> Ordering {
183            if a.event == b.event {
184                UUID::weak_cmp(&a.location, &b.location)
185            } else {
186                UUID::weak_cmp(&b.event, &a.event)
187            }
188        }
189
190        fn secondary_cmp(a: &Op, b: &Op) -> Ordering {
191            UUID::weak_cmp(&a.location, &b.location)
192        }
193
194        fn peek(&self) -> Option<&Op> {
195            self.0.peek()
196        }
197    }
198
199    impl<'a> Iterator for HeapOrd<'a> {
200        type Item = Op;
201
202        fn next(&mut self) -> Option<Op> {
203            self.0.next()
204        }
205    }
206
207    impl<'a> From<Frame<'a>> for HeapOrd<'a> {
208        fn from(frame: Frame<'a>) -> Self {
209            HeapOrd(frame)
210        }
211    }
212
213    #[test]
214    fn merge() {
215        let frame_a = Frame::parse("*rga#test@0:0!@1'A'@2'B'"); //  D E A C B
216        let frame_b = Frame::parse("*rga#test@0:0!@1'A'@3'C'");
217        let frame_c = Frame::parse("*rga#test@0:0!@4'D'@5'E'");
218        let mut frame_r = Frame::parse("*rga#test@4'D',@5'E'@1'A'@3'C'@2'B'");
219        let mut heap = Heap::<HeapOrd>::new(vec![frame_a, frame_b, frame_c]);
220
221        while let op @ Some(_) = heap.pop() {
222            assert_eq!(frame_r.next(), op);
223        }
224
225        assert!(frame_r.next().is_none());
226    }
227}