1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
//! A general purpose `Batcher` implementation based on radix sort.
use timely::progress::frontier::Antichain;
use timely_sort::{MSBRadixSorter, RadixSorterBase};
use ::Diff;
use hashable::HashOrdered;
use lattice::Lattice;
use trace::{Batch, Batcher, Builder};
/// Creates batches from unordered tuples.
pub struct RadixBatcher<K: HashOrdered, V, T: PartialOrd, R: Diff, B: Batch<K, V, T, R>> {
phantom: ::std::marker::PhantomData<B>,
buffers: Vec<Vec<((K, V), T, R)>>,
sorted: usize,
sorter: MSBRadixSorter<((K, V), T, R)>,
stash: Vec<Vec<((K, V), T, R)>>,
lower: Vec<T>,
frontier: Antichain<T>,
}
impl<K, V, T, R, B> RadixBatcher<K, V, T, R, B>
where
K: Ord+HashOrdered,
V: Ord,
T: Lattice+Ord,
R: Diff,
B: Batch<K, V, T, R>
{
// Provides an allocated buffer, either from stash or through allocation.
fn empty(&mut self) -> Vec<((K, V), T, R)> {
self.stash.pop().unwrap_or_else(|| Vec::with_capacity(1 << 10))
}
// Compacts the representation of data in self.buffer and self.buffers.
// This could, in principle, move data into a trace, because it is even more
// compact in that form, and easier to merge as compared to re-sorting.
#[inline(never)]
fn compact(&mut self) {
self.sorter.sort_and(&mut self.buffers, &|x: &((K,V),T,R)| (x.0).0.hashed(), |slice| consolidate_vec(slice));
self.sorter.rebalance(&mut self.stash, 256);
self.sorted = self.buffers.len();
self.stash.clear(); // <-- too aggressive?
}
#[inline(never)]
fn segment(&mut self, upper: &[T]) -> Vec<Vec<((K,V),T,R)>> {
let mut to_keep = Vec::new(); // updates that are not yet ready.
let mut to_seal = Vec::new(); // updates that are ready to go.
let mut to_keep_tail = self.empty();
let mut to_seal_tail = self.empty();
// swing through each buffer, each element, and partition
for mut buffer in self.buffers.drain(..) {
for ((key, val), time, diff) in buffer.drain(..) {
if !upper.iter().any(|t| t.less_equal(&time)) {
if to_seal_tail.len() == to_seal_tail.capacity() {
if to_seal_tail.len() > 0 {
to_seal.push(to_seal_tail);
}
to_seal_tail = self.stash.pop().unwrap_or_else(|| Vec::with_capacity(1 << 10));
}
to_seal_tail.push(((key, val), time, diff));
}
else {
if to_keep_tail.len() == to_keep_tail.capacity() {
if to_keep_tail.len() > 0 {
to_keep.push(to_keep_tail);
}
to_keep_tail = self.stash.pop().unwrap_or_else(|| Vec::with_capacity(1 << 10));
}
to_keep_tail.push(((key, val), time, diff));
}
}
self.stash.push(buffer);
}
if to_keep_tail.len() > 0 { to_keep.push(to_keep_tail); }
if to_seal_tail.len() > 0 { to_seal.push(to_seal_tail); }
self.buffers = to_keep;
to_seal
}
}
impl<K, V, T, R, B> Batcher<K, V, T, R, B> for RadixBatcher<K, V, T, R, B>
where
K: Ord+HashOrdered,
V: Ord,
T: Lattice+Ord+Clone,
R: Diff,
B: Batch<K, V, T, R>,
{
fn new() -> Self {
RadixBatcher {
phantom: ::std::marker::PhantomData,
buffers: Vec::new(),
sorter: MSBRadixSorter::new(),
sorted: 0,
stash: Vec::new(),
frontier: Antichain::new(),
lower: vec![T::minimum()],
}
}
#[inline(never)]
fn push_batch(&mut self, batch: &mut Vec<((K,V),T,R)>) {
// If we have spare capacity, copy contents rather than appending list.
if self.buffers.last().map(|buf| buf.len() + batch.len() <= buf.capacity()) == Some(true) {
self.buffers.last_mut().map(|buf| buf.extend(batch.drain(..)));
}
else {
self.buffers.push(::std::mem::replace(batch, Vec::new()));
}
// If we have accepted a lot of data since our last compaction, compact again!
if self.buffers.len() > ::std::cmp::max(2 * self.sorted, 1_000) {
self.compact();
}
}
// Sealing a batch means finding those updates with times not greater or equal to any time
// in `upper`. All updates must have time greater or equal to the previously used `upper`,
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline(never)]
fn seal(&mut self, upper: &[T]) -> B {
// TODO: We filter and then consolidate; we could consolidate and then filter, for general
// health of compact state. Would mean that repeated `seal` calls wouldn't have to
// re-sort the data if we tracked a dirty bit. Maybe wouldn't be all that helpful,
// if we aren't running a surplus of data (if the optimistic compaction isn't helpful).
//
// Until timely dataflow gets multiple capabilities per message, we will probably want to
// consider sealing multiple batches at once, as we will get multiple requests with nearly
// the same `upper`, as we retire a few capabilities in sequence. Food for thought, anyhow.
//
// Our goal here is to partition stashed updates into "those to keep" and "those to sort"
// as efficiently as possible. In particular, if we can avoid lot of allocations, re-using
// the allocations we already have, I would be delighted.
// Extract data we plan to seal and ship.
let mut to_seal = self.segment(upper);
// Sort the data; this uses top-down MSB radix sort with an early exit to consolidate_vec.
self.sorter.sort_and(&mut to_seal, &|x: &((K,V),T,R)| (x.0).0.hashed(), |slice| consolidate_vec(slice));
// TODO: Add a `with_capacity` method to the trait, to pre-allocate space.
let count = to_seal.iter().map(|x| x.len()).sum();
let mut builder = B::Builder::with_capacity(count);
for buffer in to_seal.iter_mut() {
for ((key, val), time, diff) in buffer.drain(..) {
debug_assert!(!diff.is_zero());
builder.push((key, val, time, diff));
}
}
// Recycle the consumed buffers, if appropriate.
self.sorter.rebalance(&mut to_seal, 256);
// Return the finished layer with its bounds.
let result = builder.done(&self.lower[..], upper, &self.lower[..]);
self.lower = upper.to_vec();
result
}
fn frontier(&mut self) -> &[T] {
self.frontier.clear();
for buffer in &self.buffers {
for &(_, ref time, _) in buffer {
self.frontier.insert(time.clone());
}
}
self.frontier.elements()
}
}
/// Scans `vec[off..]` and consolidates differences of adjacent equivalent elements.
#[inline(always)]
fn consolidate_vec<K: Ord+HashOrdered, V: Ord, T:Ord, R: Diff>(slice: &mut Vec<((K,V),T,R)>) {
slice.sort_by(|&(ref kv1, ref t1, _),&(ref kv2, ref t2, _)| (kv1, t1).cmp(&(kv2, t2)));
for index in 1 .. slice.len() {
if slice[index].0 == slice[index - 1].0 && slice[index].1 == slice[index - 1].1 {
slice[index].2 = slice[index].2 + slice[index - 1].2;
slice[index - 1].2 = R::zero();
}
}
slice.retain(|x| !x.2.is_zero());
}