1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::fmt::Debug;
6
7use crate::{Frame, Op, Terminator};
8
9pub trait FrameOrd<'a>: From<Frame<'a>> + Iterator<Item = Op> {
11 fn peek<'b>(&'b self) -> Option<&'b Op>;
13 fn primary_cmp(a: &Op, b: &Op) -> Ordering;
15 fn secondary_cmp(a: &Op, b: &Op) -> Ordering;
17}
18
19#[derive(Debug)]
20struct Wrapper<T> {
21 inner: T,
22}
23
24impl<'a, T> PartialOrd for Wrapper<T>
25where
26 T: FrameOrd<'a>,
27{
28 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
29 Some(self.cmp(other))
30 }
31}
32
33impl<'a, T> Ord for Wrapper<T>
34where
35 T: FrameOrd<'a>,
36{
37 fn cmp(&self, other: &Self) -> Ordering {
38 match (self.inner.peek(), other.inner.peek()) {
39 (None, None) => Ordering::Equal,
40 (None, Some(_)) => Ordering::Less,
41 (Some(_), None) => Ordering::Greater,
42 (Some(a), Some(b)) => {
43 let ret = T::primary_cmp(a, b);
44 if ret == Ordering::Equal {
45 T::secondary_cmp(a, b)
46 } else {
47 ret
48 }
49 }
50 }
51 .reverse()
52 }
53}
54
55impl<'a, T> PartialEq for Wrapper<T>
56where
57 T: FrameOrd<'a>,
58{
59 fn eq(&self, other: &Self) -> bool {
60 self.cmp(other) == Ordering::Equal
61 }
62}
63
64impl<'a, T> Eq for Wrapper<T> where T: FrameOrd<'a> {}
65
66pub struct Heap<T> {
68 heap: BinaryHeap<Wrapper<T>>,
69 top: Option<Op>,
70}
71
72impl<'a, T> Heap<T>
73where
74 T: FrameOrd<'a> + Debug,
75{
76 pub fn new(batch: Vec<Frame<'a>>) -> Self {
78 let mut heap = BinaryHeap::default();
79
80 for mut f in batch {
81 loop {
82 match f.peek().cloned() {
83 Some(Op { term: Terminator::Header, .. })
84 | Some(Op { term: Terminator::Query, .. }) => {
85 f.next();
86 }
87 Some(_) => {
88 heap.push(Wrapper { inner: f.into() });
89 break;
90 }
91 None => {
92 break;
93 }
94 }
95 }
96 }
97
98 let mut ret = Heap { heap: heap, top: None };
99 let top = ret.advance();
100 ret.top = top;
101 ret
102 }
103
104 pub fn top<'b>(&'b self) -> Option<&'b Op> {
106 self.top.as_ref()
107 }
108
109 pub fn pop(&mut self) -> Option<Op> {
111 use std::mem;
112
113 if self.top.is_some() {
114 let mut ret = None;
115
116 mem::swap(&mut ret, &mut self.top);
117 self.top = self.advance();
118 ret
119 } else {
120 None
121 }
122 }
123
124 fn advance(&mut self) -> Option<Op> {
125 loop {
126 match self.heap.pop() {
127 Some(mut frm) => {
128 match frm.inner.peek().cloned() {
129 Some(Op { term: Terminator::Header, .. })
130 | Some(Op { term: Terminator::Query, .. }) => {
131 frm.inner.next();
132 self.heap.push(frm);
133 }
135 Some(op) => {
136 self.heap.push(frm);
137 self.drain(&op);
138 return Some(op);
139 }
140 None => {}
141 }
142 }
143 None => {
144 return None;
145 }
146 }
147 }
148 }
149
150 fn drain(&mut self, op: &Op) {
151 let mut frames = Vec::with_capacity(self.heap.len());
152
153 while let Some(mut frm) = self.heap.pop() {
154 loop {
155 match frm.inner.peek().cloned() {
156 Some(ref p) if T::primary_cmp(p, op) == Ordering::Equal => {
157 frm.inner.next();
158 }
159 _ => {
160 break;
161 }
162 }
163 }
164 frames.push(frm);
165 }
166
167 self.heap.extend(frames);
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use uuid::UUID;
175
176 use crate::Heap;
177
178 #[derive(Debug)]
179 struct HeapOrd<'a>(Frame<'a>);
180
181 impl<'a> FrameOrd<'a> for HeapOrd<'a> {
182 fn primary_cmp(a: &Op, b: &Op) -> Ordering {
183 if a.event == b.event {
184 UUID::weak_cmp(&a.location, &b.location)
185 } else {
186 UUID::weak_cmp(&b.event, &a.event)
187 }
188 }
189
190 fn secondary_cmp(a: &Op, b: &Op) -> Ordering {
191 UUID::weak_cmp(&a.location, &b.location)
192 }
193
194 fn peek(&self) -> Option<&Op> {
195 self.0.peek()
196 }
197 }
198
199 impl<'a> Iterator for HeapOrd<'a> {
200 type Item = Op;
201
202 fn next(&mut self) -> Option<Op> {
203 self.0.next()
204 }
205 }
206
207 impl<'a> From<Frame<'a>> for HeapOrd<'a> {
208 fn from(frame: Frame<'a>) -> Self {
209 HeapOrd(frame)
210 }
211 }
212
213 #[test]
214 fn merge() {
215 let frame_a = Frame::parse("*rga#test@0:0!@1'A'@2'B'"); let frame_b = Frame::parse("*rga#test@0:0!@1'A'@3'C'");
217 let frame_c = Frame::parse("*rga#test@0:0!@4'D'@5'E'");
218 let mut frame_r = Frame::parse("*rga#test@4'D',@5'E'@1'A'@3'C'@2'B'");
219 let mut heap = Heap::<HeapOrd>::new(vec![frame_a, frame_b, frame_c]);
220
221 while let op @ Some(_) = heap.pop() {
222 assert_eq!(frame_r.next(), op);
223 }
224
225 assert!(frame_r.next().is_none());
226 }
227}