1use timely::progress::frontier::AntichainRef;
12use timely::progress::{frontier::Antichain, Timestamp};
13use timely::container::PushInto;
14
15use crate::logging::{BatcherEvent, Logger};
16use crate::trace::{Batcher, Description};
17
18pub struct MergeBatcher<M: Merger> {
20 chains: Vec<Vec<M::Chunk>>,
24 stash: Vec<M::Chunk>,
26 merger: M,
28 lower: Antichain<M::Time>,
30 frontier: Antichain<M::Time>,
32 logger: Option<Logger>,
34 operator_id: usize,
36}
37
38impl<M> Batcher for MergeBatcher<M>
39where
40 M: Merger<Time: Timestamp>,
41{
42 type Time = M::Time;
43 type Output = M::Chunk;
44
45 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
46 Self {
47 logger,
48 operator_id,
49 merger: M::default(),
50 chains: Vec::new(),
51 stash: Vec::new(),
52 frontier: Antichain::new(),
53 lower: Antichain::from_elem(M::Time::minimum()),
54 }
55 }
56
57 fn seal(&mut self, upper: Antichain<M::Time>) -> (Vec<Self::Output>, Description<M::Time>) {
62 while self.chains.len() > 1 {
64 let list1 = self.chain_pop().unwrap();
65 let list2 = self.chain_pop().unwrap();
66 let merged = self.merge_by(list1, list2);
67 self.chain_push(merged);
68 }
69 let merged = self.chain_pop().unwrap_or_default();
70
71 let mut kept = Vec::new();
73 let mut readied = Vec::new();
74 self.frontier.clear();
75
76 self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
77
78 if !kept.is_empty() {
79 self.chain_push(kept);
80 }
81
82 self.stash.clear();
83
84 let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
85 self.lower = upper;
86 (readied, description)
87 }
88
89 #[inline]
91 fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
92 self.frontier.borrow()
93 }
94}
95
96impl<M: Merger> PushInto<M::Chunk> for MergeBatcher<M> {
97 fn push_into(&mut self, chunk: M::Chunk) {
98 self.insert_chain(vec![chunk]);
99 }
100}
101
102impl<M: Merger> MergeBatcher<M> {
103 fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
106 if !chain.is_empty() {
107 self.chain_push(chain);
108 while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
109 let list1 = self.chain_pop().unwrap();
110 let list2 = self.chain_pop().unwrap();
111 let merged = self.merge_by(list1, list2);
112 self.chain_push(merged);
113 }
114 }
115 }
116
117 fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
119 let mut output = Vec::with_capacity(list1.len() + list2.len());
121 self.merger.merge(list1, list2, &mut output, &mut self.stash);
122
123 output
124 }
125
126 #[inline]
128 fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
129 let chain = self.chains.pop();
130 self.account(chain.iter().flatten().map(M::account), -1);
131 chain
132 }
133
134 #[inline]
136 fn chain_push(&mut self, chain: Vec<M::Chunk>) {
137 self.account(chain.iter().map(M::account), 1);
138 self.chains.push(chain);
139 }
140
141 #[inline]
146 fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
147 if let Some(logger) = &self.logger {
148 let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
149 for (records_, size_, capacity_, allocations_) in items {
150 records = records.saturating_add_unsigned(records_);
151 size = size.saturating_add_unsigned(size_);
152 capacity = capacity.saturating_add_unsigned(capacity_);
153 allocations = allocations.saturating_add_unsigned(allocations_);
154 }
155 logger.log(BatcherEvent {
156 operator: self.operator_id,
157 records_diff: records * diff,
158 size_diff: size * diff,
159 capacity_diff: capacity * diff,
160 allocations_diff: allocations * diff,
161 })
162 }
163 }
164}
165
166impl<M: Merger> Drop for MergeBatcher<M> {
167 fn drop(&mut self) {
168 while self.chain_pop().is_some() {}
170 }
171}
172
173pub trait Merger: Default {
175 type Chunk: Default;
177 type Time;
179 fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
181 fn extract(
183 &mut self,
184 merged: Vec<Self::Chunk>,
185 upper: AntichainRef<Self::Time>,
186 frontier: &mut Antichain<Self::Time>,
187 readied: &mut Vec<Self::Chunk>,
188 kept: &mut Vec<Self::Chunk>,
189 stash: &mut Vec<Self::Chunk>,
190 );
191
192 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
194}
195
196pub mod vec {
198
199 use std::marker::PhantomData;
200 use timely::container::SizableContainer;
201 use timely::progress::frontier::{Antichain, AntichainRef};
202 use timely::PartialOrder;
203 use crate::trace::implementations::merge_batcher::Merger;
204
205 pub struct VecMerger<D, T, R> {
207 _marker: PhantomData<(D, T, R)>,
208 }
209
210 impl<D, T, R> Default for VecMerger<D, T, R> {
211 fn default() -> Self { Self { _marker: PhantomData } }
212 }
213
214 impl<D, T, R> VecMerger<D, T, R> {
215 fn target_capacity() -> usize {
220 timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
221 }
222 fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
224 let target = Self::target_capacity();
225 let mut container = stash.pop().unwrap_or_default();
226 container.clear();
227 if container.capacity() != target {
229 container = Vec::with_capacity(target);
230 }
231 container
232 }
233 fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
235 if queue.is_empty() {
236 let target = Self::target_capacity();
237 if stash.len() < 2 {
238 let mut recycled = Vec::from(std::mem::take(queue));
239 recycled.clear();
240 if recycled.capacity() == target {
241 stash.push(recycled);
242 }
243 }
244 if let Some(chunk) = iter.next() {
245 *queue = std::collections::VecDeque::from(chunk);
246 }
247 }
248 }
249 }
250
251 impl<D, T, R> Merger for VecMerger<D, T, R>
252 where
253 D: Ord + Clone + 'static,
254 T: Ord + Clone + PartialOrder + 'static,
255 R: crate::difference::Semigroup + 'static,
256 {
257 type Chunk = Vec<(D, T, R)>;
258 type Time = T;
259
260 fn merge(
261 &mut self,
262 list1: Vec<Vec<(D, T, R)>>,
263 list2: Vec<Vec<(D, T, R)>>,
264 output: &mut Vec<Vec<(D, T, R)>>,
265 stash: &mut Vec<Vec<(D, T, R)>>,
266 ) {
267 use std::cmp::Ordering;
268 use std::collections::VecDeque;
269
270 let mut iter1 = list1.into_iter();
271 let mut iter2 = list2.into_iter();
272 let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
273 let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());
274
275 let mut result = self.empty(stash);
276
277 while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
279 match (d1, t1).cmp(&(d2, t2)) {
280 Ordering::Less => {
281 result.push(q1.pop_front().unwrap());
282 }
283 Ordering::Greater => {
284 result.push(q2.pop_front().unwrap());
285 }
286 Ordering::Equal => {
287 let (d, t, mut r1) = q1.pop_front().unwrap();
288 let (_, _, r2) = q2.pop_front().unwrap();
289 r1.plus_equals(&r2);
290 if !r1.is_zero() {
291 result.push((d, t, r1));
292 }
293 }
294 }
295
296 if result.at_capacity() {
297 output.push(std::mem::take(&mut result));
298 result = self.empty(stash);
299 }
300
301 if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
303 if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
304 }
305
306 if !result.is_empty() { output.push(result); }
308 for q in [q1, q2] {
309 if !q.is_empty() { output.push(Vec::from(q)); }
310 }
311 output.extend(iter1);
312 output.extend(iter2);
313 }
314
315 fn extract(
316 &mut self,
317 merged: Vec<Vec<(D, T, R)>>,
318 upper: AntichainRef<T>,
319 frontier: &mut Antichain<T>,
320 ship: &mut Vec<Vec<(D, T, R)>>,
321 kept: &mut Vec<Vec<(D, T, R)>>,
322 stash: &mut Vec<Vec<(D, T, R)>>,
323 ) {
324 let mut keep = self.empty(stash);
325 let mut ready = self.empty(stash);
326
327 for mut chunk in merged {
328 for (data, time, diff) in chunk.drain(..) {
330 if upper.less_equal(&time) {
331 frontier.insert_with(&time, |time| time.clone());
332 keep.push((data, time, diff));
333 } else {
334 ready.push((data, time, diff));
335 }
336 if keep.at_capacity() {
337 kept.push(std::mem::take(&mut keep));
338 keep = self.empty(stash);
339 }
340 if ready.at_capacity() {
341 ship.push(std::mem::take(&mut ready));
342 ready = self.empty(stash);
343 }
344 }
345 if chunk.capacity() == Self::target_capacity() {
347 stash.push(chunk);
348 }
349 }
350 if !keep.is_empty() { kept.push(keep); }
351 if !ready.is_empty() { ship.push(ready); }
352 }
353
354 fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
355 (chunk.len(), 0, 0, 0)
356 }
357 }
358}