differential_dataflow/trace/implementations/
merge_batcher.rs1use std::collections::VecDeque;
4
5use timely::communication::message::RefOrMut;
6use timely::logging::WorkerIdentifier;
7use timely::logging_core::Logger;
8use timely::progress::{frontier::Antichain, Timestamp};
9
10use crate::difference::Semigroup;
11use crate::logging::{BatcherEvent, DifferentialEvent};
12use crate::trace::{Batcher, Builder};
13
14pub struct MergeBatcher<K, V, T, D> {
16 sorter: MergeSorter<(K, V), T, D>,
17 lower: Antichain<T>,
18 frontier: Antichain<T>,
19}
20
21impl<K, V, T, D> Batcher for MergeBatcher<K, V, T, D>
22where
23 K: Ord + Clone,
24 V: Ord + Clone,
25 T: Timestamp,
26 D: Semigroup,
27{
28 type Item = ((K,V),T,D);
29 type Time = T;
30
31 fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
32 MergeBatcher {
33 sorter: MergeSorter::new(logger, operator_id),
34 frontier: Antichain::new(),
35 lower: Antichain::from_elem(T::minimum()),
36 }
37 }
38
39 #[inline(never)]
40 fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>) {
41 match batch {
43 RefOrMut::Ref(reference) => {
44 let mut owned: Vec<_> = self.sorter.empty();
47 owned.clone_from(reference);
48 self.sorter.push(&mut owned);
49 },
50 RefOrMut::Mut(reference) => {
51 self.sorter.push(reference);
52 }
53 }
54 }
55
56 #[inline(never)]
61 fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
62
63 let mut merged = Vec::new();
64 self.sorter.finish_into(&mut merged);
65
66 let mut builder = {
69 let mut keys = 0;
70 let mut vals = 0;
71 let mut upds = 0;
72 let mut prev_keyval = None;
73 for buffer in merged.iter() {
74 for ((key, val), time, _) in buffer.iter() {
75 if !upper.less_equal(time) {
76 if let Some((p_key, p_val)) = prev_keyval {
77 if p_key != key {
78 keys += 1;
79 vals += 1;
80 }
81 else if p_val != val {
82 vals += 1;
83 }
84 upds += 1;
85 }
86 prev_keyval = Some((key, val));
87 }
88 }
89 }
90 B::with_capacity(keys, vals, upds)
91 };
92
93 let mut kept = Vec::new();
94 let mut keep = Vec::new();
95
96 self.frontier.clear();
97
98 for mut buffer in merged.drain(..) {
100 for ((key, val), time, diff) in buffer.drain(..) {
101 if upper.less_equal(&time) {
102 self.frontier.insert(time.clone());
103 if keep.len() == keep.capacity() && !keep.is_empty() {
104 kept.push(keep);
105 keep = self.sorter.empty();
106 }
107 keep.push(((key, val), time, diff));
108 }
109 else {
110 builder.push(((key, val), time, diff));
111 }
112 }
113 self.sorter.push(&mut buffer);
115 }
116
117 if !keep.is_empty() {
119 kept.push(keep);
120 }
121 if !kept.is_empty() {
122 self.sorter.push_list(kept);
123 }
124
125 let mut buffer = Vec::new();
130 self.sorter.push(&mut buffer);
131 while buffer.capacity() > 0 && std::mem::size_of::<((K,V),T,D)>() > 0 {
133 buffer = Vec::new();
134 self.sorter.push(&mut buffer);
135 }
136
137 let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum()));
138 self.lower = upper;
139 seal
140 }
141
142 fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
144 self.frontier.borrow()
145 }
146}
147
148struct MergeSorter<D, T, R> {
149 queue: Vec<Vec<Vec<(D, T, R)>>>,
151 stash: Vec<Vec<(D, T, R)>>,
152 logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
153 operator_id: usize,
154}
155
156impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
157
158 const BUFFER_SIZE_BYTES: usize = 1 << 13;
159
160 fn buffer_size() -> usize {
161 let size = ::std::mem::size_of::<(D, T, R)>();
162 if size == 0 {
163 Self::BUFFER_SIZE_BYTES
164 } else if size <= Self::BUFFER_SIZE_BYTES {
165 Self::BUFFER_SIZE_BYTES / size
166 } else {
167 1
168 }
169 }
170
171 #[inline]
172 fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
173 Self {
174 logger,
175 operator_id,
176 queue: Vec::new(),
177 stash: Vec::new(),
178 }
179 }
180
181 #[inline]
182 pub fn empty(&mut self) -> Vec<(D, T, R)> {
183 self.stash.pop().unwrap_or_else(|| Vec::with_capacity(Self::buffer_size()))
184 }
185
186 #[inline]
187 pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) {
188 let mut batch = if self.stash.len() > 2 {
191 ::std::mem::replace(batch, self.stash.pop().unwrap())
192 }
193 else {
194 ::std::mem::take(batch)
195 };
196
197 if !batch.is_empty() {
198 crate::consolidation::consolidate_updates(&mut batch);
199 self.account([batch.len()], 1);
200 self.queue_push(vec![batch]);
201 while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) {
202 let list1 = self.queue_pop().unwrap();
203 let list2 = self.queue_pop().unwrap();
204 let merged = self.merge_by(list1, list2);
205 self.queue_push(merged);
206 }
207 }
208 }
209
210 pub fn push_list(&mut self, list: Vec<Vec<(D, T, R)>>) {
213 while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() {
214 let list1 = self.queue_pop().unwrap();
215 let list2 = self.queue_pop().unwrap();
216 let merged = self.merge_by(list1, list2);
217 self.queue_push(merged);
218 }
219 self.queue_push(list);
220 }
221
222 #[inline(never)]
223 pub fn finish_into(&mut self, target: &mut Vec<Vec<(D, T, R)>>) {
224 while self.queue.len() > 1 {
225 let list1 = self.queue_pop().unwrap();
226 let list2 = self.queue_pop().unwrap();
227 let merged = self.merge_by(list1, list2);
228 self.queue_push(merged);
229 }
230
231 if let Some(mut last) = self.queue_pop() {
232 ::std::mem::swap(&mut last, target);
233 }
234 }
235
236 #[inline(never)]
238 fn merge_by(&mut self, list1: Vec<Vec<(D, T, R)>>, list2: Vec<Vec<(D, T, R)>>) -> Vec<Vec<(D, T, R)>> {
239 self.account(list1.iter().chain(list2.iter()).map(Vec::len), -1);
240
241 use std::cmp::Ordering;
242
243 let mut output = Vec::with_capacity(list1.len() + list2.len());
245 let mut result = self.empty();
246
247 let mut list1 = list1.into_iter();
248 let mut list2 = list2.into_iter();
249
250 let mut head1 = VecDeque::from(list1.next().unwrap_or_default());
251 let mut head2 = VecDeque::from(list2.next().unwrap_or_default());
252
253 while !head1.is_empty() && !head2.is_empty() {
255
256 while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
257
258 let cmp = {
259 let x = head1.front().unwrap();
260 let y = head2.front().unwrap();
261 (&x.0, &x.1).cmp(&(&y.0, &y.1))
262 };
263 match cmp {
264 Ordering::Less => result.push(head1.pop_front().unwrap()),
265 Ordering::Greater => result.push(head2.pop_front().unwrap()),
266 Ordering::Equal => {
267 let (data1, time1, mut diff1) = head1.pop_front().unwrap();
268 let (_data2, _time2, diff2) = head2.pop_front().unwrap();
269 diff1.plus_equals(&diff2);
270 if !diff1.is_zero() {
271 result.push((data1, time1, diff1));
272 }
273 }
274 }
275 }
276
277 if result.capacity() == result.len() {
278 output.push(result);
279 result = self.empty();
280 }
281
282 if head1.is_empty() {
283 let done1 = Vec::from(head1);
284 if done1.capacity() == Self::buffer_size() { self.stash.push(done1); }
285 head1 = VecDeque::from(list1.next().unwrap_or_default());
286 }
287 if head2.is_empty() {
288 let done2 = Vec::from(head2);
289 if done2.capacity() == Self::buffer_size() { self.stash.push(done2); }
290 head2 = VecDeque::from(list2.next().unwrap_or_default());
291 }
292 }
293
294 if !result.is_empty() { output.push(result); }
295 else if result.capacity() > 0 { self.stash.push(result); }
296
297 if !head1.is_empty() {
298 let mut result = self.empty();
299 for item1 in head1 { result.push(item1); }
300 output.push(result);
301 }
302 output.extend(list1);
303
304 if !head2.is_empty() {
305 let mut result = self.empty();
306 for item2 in head2 { result.push(item2); }
307 output.push(result);
308 }
309 output.extend(list2);
310
311 output
312 }
313}
314
315impl<D, T, R> MergeSorter<D, T, R> {
316 #[inline]
318 fn queue_pop(&mut self) -> Option<Vec<Vec<(D, T, R)>>> {
319 let batch = self.queue.pop();
320 self.account(batch.iter().flatten().map(Vec::len), -1);
321 batch
322 }
323
324 #[inline]
326 fn queue_push(&mut self, batch: Vec<Vec<(D, T, R)>>) {
327 self.account(batch.iter().map(Vec::len), 1);
328 self.queue.push(batch);
329 }
330
331 fn account<I: IntoIterator<Item=usize>>(&self, items: I, diff: isize) {
336 if let Some(logger) = &self.logger {
337 let mut records= 0isize;
338 for len in items {
339 records = records.saturating_add_unsigned(len);
340 }
341 logger.log(BatcherEvent {
342 operator: self.operator_id,
343 records_diff: records * diff,
344 size_diff: 0,
345 capacity_diff: 0,
346 allocations_diff: 0,
347 })
348 }
349 }
350}
351
352impl<D, T, R> Drop for MergeSorter<D, T, R> {
353 fn drop(&mut self) {
354 while self.queue_pop().is_some() { }
355 }
356}