1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
//! Accumulator of `(key, val, wgt)` triples based on sorting and run-length encoding.
//!
//! Differential dataflow operators receive large numbers of `(key, val, wgt)` triples, and must
//! group these records by `key`, and then accumulate the `wgt`s of equal `val` values, discarding
//! those whose weight accumulates to zero.
//!
//! The grouped and accumulated form can be much more compact than a list of triples, by removing
//! the repetition of `key`s, accumulating multiple `wgt`s together, and discarding `val`s whose
//! weight accumulation is zero. Because we are space-sensitive, we would like to be able to
//! maintain received data compactly, rather than have to accumulate the large list of triples and
//! sort it only once complete.
//!
//! This module provides a `Accumulator` structure capable of receiving `(key, val, wgt)` triples and
//! which compacts them as they arrive, maintaining no more than 1.5x the space required for the
//! list of values. It can be configured to have a minimum capacity, so if there is enough space
//! the `Accumulator` structure will only accumulate elements in a list, then sort and coalesce, doing
//! exactly what we would have done in the simple case. As memory gets tighter, it behaves more
//! responsiby.

use iterators::coalesce::Coalesce;

use std::fmt::Debug;

use timely_sort::Unsigned;

/// A compressed representation of the accumulation of `(key, val, wgt)` triples.
// TODO : RLE where a run of two of the same elements means a value in a second array.
// TODO : this would probably improve compressed representations of small sets (those without much
// TODO : key repetition). Compressing these better means we can go longer before merging, which
// TODO : should make most everything else better too.
#[derive(Debug, Clone)]
pub struct Compact<K, V> {
    /// An ordered list of the distinct keys.
    pub keys: Vec<K>,
    /// Counts for each key indicating the number of corresponding values in `self.vals`.
    ///
    /// The list is maintained separately in the interest of eventually having run-length coding
    /// treat non-repetitions better.
    pub cnts: Vec<u32>,
    /// A list of values, ordered within each key group.
    pub vals: Vec<(V, i32)>,
}

// abomonate!(Compact : keys, cnts, vals);

impl<K: Ord+Debug, V: Ord> Compact<K, V> {
    /// Constructs a new `Compact` with indicated initial capacities.
    ///
    /// Most operations with `Compact` eventually shrink the amount of memory to fit whatever they
    /// have used, so the main concern here is to avoid grossly over-allocating. Typically, these
    /// structs are created in a transient compaction step and not maintained open, meaning we can
    /// afford to be a bit sloppy.
    pub fn new(k: usize, v: usize) -> Compact<K, V> {
        Compact {
            keys: Vec::with_capacity(k),
            cnts: Vec::with_capacity(k),
            vals: Vec::with_capacity(v),
            // wgts: Vec::with_capacity(w),
        }
    }
    /// Reports the size in bytes, used elsewhere to determine how much space we should use for
    /// buffering uncompressed elements.
    pub fn size(&self) -> usize {
        self.keys.len() * ::std::mem::size_of::<K>() +
        self.cnts.len() * 4 +
        self.vals.len() * ::std::mem::size_of::<(V,i32)>()
    }

    /// Populates the `Compact` from an iterator of ordered `(key, val, wgt)` triples.
    ///
    /// The `Compact` does not know about the ordering, only that it should look for repetitions of
    /// in the sequences of `key` and `wgt`.
    // #[inline(never)]
    pub fn extend<I: Iterator<Item=((K, V), i32)>>(&mut self, mut iterator: I) {

        // populate a new `Compact` with merged, coalesced data.
        if let Some(((mut old_key, val), wgt)) = iterator.next() {

            let mut key_cnt = 1;

            // always stash the val
            self.vals.push((val, wgt));

            for ((key, val), wgt) in iterator {

                // always stash the val
                self.vals.push((val,wgt));

                // if the key has changed, stash the key
                if old_key != key {
                    self.keys.push(old_key);
                    self.cnts.push(key_cnt);
                    old_key = key;
                    key_cnt = 0;
                }

                key_cnt += 1;
            }

            self.keys.push(old_key);
            self.cnts.push(key_cnt);
        }
    }

    /// Extends the compact collection by a set of key-value updates.
    // #[inline(never)]
    pub fn extend_by(&mut self, buffer: &mut Vec<((K, V), i32)>) {

        // coalesce things
        let mut cursor = 0;
        for index in 1 .. buffer.len() {
            if buffer[cursor].0 == buffer[index].0 {
                buffer[cursor].1 += buffer[index].1;
            }
            else {
                if buffer[cursor].1 != 0 {
                    cursor += 1;
                }
                buffer.swap(cursor, index);
            }
        }
        if buffer[cursor].1 != 0 {
            cursor += 1;
        }
        buffer.truncate(cursor);

        let mut iter = buffer.drain(..);
        if let Some(((key1,val1),wgt1)) = iter.next() {

            let mut prev_len = self.vals.len();

            self.keys.push(key1);
            self.vals.push((val1, wgt1));

            for ((key, val), wgt) in iter {

                // if the key has changed, stash the key
                if self.keys[self.keys.len() - 1] != key {
                    self.keys.push(key);
                    self.cnts.push((self.vals.len() - prev_len) as u32);
                    prev_len = self.vals.len();
                }

                // always stash the val
                self.vals.push((val,wgt));
            }

            self.cnts.push((self.vals.len() - prev_len) as u32);
        }
    }

    /// Extends the compact collection from the result of a timely_sort radix sorter.
    // #[inline(never)]
    pub fn from_radix<U: Unsigned+Default, F: Fn(&K)->U>(source: &mut Vec<Vec<((K,V),i32)>>, function: &F) -> Option<Compact<K,V>> {

        let mut size = 0;
        for list in source.iter() {
            size += list.len();
        }

        let mut result = Compact::new(size,size);
        let mut buffer = vec![];

        let mut current = Default::default();
        
        for ((key, val), wgt) in source.iter_mut().flat_map(|x| x.drain(..)) {
            let hash = function(&key);
            if buffer.len() > 0 && hash != current {
                // if hash < current { println!("  radix sort error? {} < {}", hash, current); }
                buffer.sort_by(|x: &((K,V),i32),y: &((K,V),i32)| x.0.cmp(&y.0));
        
                // result.extend(buffer.drain(..).coalesce());
                result.extend_by(&mut buffer);
            }
            buffer.push(((key,val),wgt));
            current = hash;
        }
        
        if buffer.len() > 0 {
            // hsort_by(&mut buffer, &|x: &((K,V),i32)| &x.0);
            buffer.sort_by(|x: &((K,V),i32),y: &((K,V),i32)| x.0.cmp(&y.0));
            result.extend(buffer.drain(..).coalesce());
        }

        if result.vals.len() > 0 {
            result.keys.shrink_to_fit();
            result.cnts.shrink_to_fit();
            result.vals.shrink_to_fit();

            Some(result)
        }
        else {
            None
        }
    }

    /// Creates a new session for introducing one key's worth of values.
    pub fn session<'a>(&'a mut self) -> CompactSession<'a, K, V> {
        CompactSession::new(self)
    }

    /// Pushes updates for a key from a supplied iterator.
    pub fn push<I: Iterator<Item=(V, i32)>>(&mut self, key: K, iterator: I) {
        let mut session = self.session();
        for (val, wgt) in iterator {
            session.push(val, wgt);
        }
        session.done(key);
    }
}

/// A session for adding one key's worth of updates.
pub struct CompactSession<'a, K: 'a, V: 'a> {
    compact: &'a mut Compact<K, V>,
    len: usize,
}

impl<'a, K: 'a, V: 'a> CompactSession<'a, K, V> {
    /// Allocates a new session.
    pub fn new(compact: &'a mut Compact<K, V>) -> CompactSession<'a, K, V> {
        let len = compact.vals.len();
        CompactSession {
            compact: compact,
            len: len,
        }
    }
    /// Adds an update for the key.
    #[inline]
    pub fn push(&mut self, val: V, wgt: i32) {
        self.compact.vals.push((val,wgt));
    }
    /// Consumes the session and finalizes the updates.
    pub fn done(self, key: K) {
        if self.compact.vals.len() > self.len {
            self.compact.keys.push(key);
            self.compact.cnts.push((self.compact.vals.len() - self.len) as u32);
        }
    }
}