differential_dataflow/trace/implementations/
huffman_container.rs

1//! A slice container that Huffman encodes its contents.
2
3use std::collections::BTreeMap;
4use timely::container::PushInto;
5
6use crate::trace::implementations::{BatchContainer, OffsetList};
7
8use self::wrapper::Wrapped;
9use self::encoded::Encoded;
10use self::huffman::Huffman;
11
12/// A container that contains slices `[B]` as items.
13pub struct HuffmanContainer<B: Ord+Clone> {
14    /// Either encoded data or raw data.
15    inner: Result<(Huffman<B>, Vec<u8>), Vec<B>>,
16    /// Offsets that bound each contained slice.
17    ///
18    /// The length will be one greater than the number of contained items.
19    offsets: OffsetList,
20    /// Counts of the number of each pattern we've seen.
21    stats: BTreeMap<B, i64>
22}
23
24impl<B> HuffmanContainer<B>
25where
26    B: Ord + Clone,
27{
28    /// Prints statistics about encoded containers.
29    pub fn print(&self) {
30        if let Ok((_huff, bytes)) = &self.inner {
31            println!("Bytes: {:?}, Symbols: {:?}", bytes.len(), self.stats.values().sum::<i64>());
32        }
33    }
34}
35
36impl<B: Ord + Clone + 'static> PushInto<Vec<B>> for HuffmanContainer<B> {
37    fn push_into(&mut self, item: Vec<B>) {
38        for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; }
39        match &mut self.inner {
40            Ok((huffman, bytes)) => {
41                bytes.extend(huffman.encode(item.iter()));
42                self.offsets.push(bytes.len());
43            },
44            Err(raw) => { 
45                raw.extend(item); 
46                self.offsets.push(raw.len());
47            }
48        }
49    }
50}
51
52impl<'a, B: Ord + Clone + 'static> PushInto<Wrapped<'a, B>> for HuffmanContainer<B> {
53    fn push_into(&mut self, item: Wrapped<'a, B>) {
54        match item.decode() {
55            Ok(decoded) => {
56                for x in decoded { *self.stats.entry(x.clone()).or_insert(0) += 1; }
57
58            },
59            Err(symbols) => {
60                for x in symbols.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; }
61            }
62        }
63        match (item.decode(), &mut self.inner) {
64            (Ok(decoded), Ok((huffman, bytes))) => {
65                bytes.extend(huffman.encode(decoded));
66                self.offsets.push(bytes.len());
67            }
68            (Ok(decoded), Err(raw)) => {
69                raw.extend(decoded.cloned());
70                self.offsets.push(raw.len());
71            }
72            (Err(symbols), Ok((huffman, bytes))) => {
73                bytes.extend(huffman.encode(symbols.iter()));
74                self.offsets.push(bytes.len());
75            }
76            (Err(symbols), Err(raw)) => {
77                raw.extend(symbols.iter().cloned());
78                self.offsets.push(raw.len());
79            }
80        }
81    }
82}
83
84impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
85    type Owned = Vec<B>;
86    type ReadItem<'a> = Wrapped<'a, B>;
87
88    fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
89
90    fn with_capacity(size: usize) -> Self {
91        let mut offsets = OffsetList::with_capacity(size + 1);
92        offsets.push(0);
93        Self {
94            inner: Err(Vec::with_capacity(size)),
95            offsets,
96            stats: Default::default(),
97        }
98    }
99    fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
100
101        if cont1.len() > 0 { cont1.print(); }
102        if cont2.len() > 0 { cont2.print(); }
103
104        let mut counts = BTreeMap::default();
105        for (symbol, count) in cont1.stats.iter() {
106            *counts.entry(symbol.clone()).or_insert(0) += count;
107        }
108        for (symbol, count) in cont2.stats.iter() {
109            *counts.entry(symbol.clone()).or_insert(0) += count;
110        }
111
112        let bytes = Vec::with_capacity(counts.values().cloned().sum::<i64>() as usize);
113        let huffman = Huffman::create_from(counts);
114        let inner = Ok((huffman, bytes));
115        // : Err(Vec::with_capacity(length))
116
117        let length = cont1.offsets.len() + cont2.offsets.len() - 2;
118        let mut offsets = OffsetList::with_capacity(length + 1);
119        offsets.push(0);
120        Self {
121            inner,
122            offsets,
123            stats: Default::default(),
124        }
125    }
126    fn index(&self, index: usize) -> Self::ReadItem<'_> {
127        let lower = self.offsets.index(index);
128        let upper = self.offsets.index(index+1);
129        match &self.inner {
130            Ok((huffman, bytes)) => Wrapped::encoded(Encoded::new(huffman, &bytes[lower .. upper])),
131            Err(raw) => Wrapped::decoded(&raw[lower .. upper]),
132        }
133    }
134    fn len(&self) -> usize {
135        self.offsets.len() - 1
136    }
137}
138/// Default implementation introduces a first offset.
139impl<B: Ord+Clone> Default for HuffmanContainer<B> {
140    fn default() -> Self {
141        let mut offsets = OffsetList::with_capacity(1);
142        offsets.push(0);
143        Self {
144            inner: Err(Vec::new()),
145            offsets,
146            stats: Default::default(),
147        }
148    }
149}
150
151mod wrapper {
152
153    use crate::IntoOwned;
154    use super::Encoded;
155
156    pub struct Wrapped<'a, B: Ord> {
157        inner: Result<Encoded<'a, B>, &'a [B]>,
158    }
159
160    impl<'a, B: Ord> Wrapped<'a, B> {
161        /// Returns either a decoding iterator, or just the bytes themselves.
162        pub fn decode(&'a self) -> Result<impl Iterator<Item=&'a B> + 'a, &'a [B]> {
163            match &self.inner {
164                Ok(encoded) => Ok(encoded.decode()),
165                Err(symbols) => Err(symbols),
166            }
167        }
168        /// A wrapper around an encoded sequence.
169        pub fn encoded(e: Encoded<'a, B>) -> Self { Self { inner: Ok(e) } }
170        /// A wrapper around a decoded sequence.
171        pub fn decoded(d: &'a [B]) -> Self { Self { inner: Err(d) } }
172    }
173
174    impl<'a, B: Ord> Copy for Wrapped<'a, B> { }
175    impl<'a, B: Ord> Clone for Wrapped<'a, B> {
176        fn clone(&self) -> Self { *self }
177    }
178
179    use std::cmp::Ordering;
180    impl<'a, 'b, B: Ord> PartialEq<Wrapped<'a, B>> for Wrapped<'b, B> {
181        fn eq(&self, other: &Wrapped<'a, B>) -> bool {
182            match (self.decode(), other.decode()) {
183                (Ok(decode1), Ok(decode2)) => decode1.eq(decode2),
184                (Ok(decode1), Err(bytes2)) => decode1.eq(bytes2.iter()),
185                (Err(bytes1), Ok(decode2)) => bytes1.iter().eq(decode2),
186                (Err(bytes1), Err(bytes2)) => bytes1.eq(bytes2),
187            }
188        }
189    }
190    impl<'a, B: Ord> Eq for Wrapped<'a, B> { }
191    impl<'a, 'b, B: Ord> PartialOrd<Wrapped<'a, B>> for Wrapped<'b, B> {
192        fn partial_cmp(&self, other: &Wrapped<'a, B>) -> Option<Ordering> {
193            match (self.decode(), other.decode()) {
194                (Ok(decode1), Ok(decode2)) => decode1.partial_cmp(decode2),
195                (Ok(decode1), Err(bytes2)) => decode1.partial_cmp(bytes2.iter()),
196                (Err(bytes1), Ok(decode2)) => bytes1.iter().partial_cmp(decode2),
197                (Err(bytes1), Err(bytes2)) => bytes1.partial_cmp(bytes2),
198            }
199        }
200    }
201    impl<'a, B: Ord> Ord for Wrapped<'a, B> {
202        fn cmp(&self, other: &Self) -> Ordering {
203            self.partial_cmp(other).unwrap()
204        }
205    }
206    impl<'a, B: Ord+Clone> IntoOwned<'a> for Wrapped<'a, B> {
207        type Owned = Vec<B>;
208        fn into_owned(self) -> Self::Owned {
209            match self.decode() {
210                Ok(decode) => decode.cloned().collect(),
211                Err(bytes) => bytes.to_vec(),
212            }
213        }
214        fn clone_onto(self, other: &mut Self::Owned) {
215            other.clear();
216            match self.decode() {
217                Ok(decode) => other.extend(decode.cloned()),
218                Err(bytes) => other.extend_from_slice(bytes),
219            }
220        }
221        fn borrow_as(owned: &'a Self::Owned) -> Self {
222            Self { inner: Err(&owned[..]) }
223        }
224    }
225}
226
227/// Wrapper around a Huffman decoder and byte slices, decodeable to a byte sequence.
228mod encoded {
229
230    use super::Huffman;
231
232    /// Welcome to GATs!
233    pub struct Encoded<'a, B: Ord> {
234        /// Text that decorates the data.
235        huffman: &'a Huffman<B>,
236        /// The data itself.
237        bytes: &'a [u8],
238    }
239
240    impl<'a, B: Ord> Encoded<'a, B> {
241        /// Returns either a decoding iterator, or just the bytes themselves.
242        pub fn decode(&'a self) -> impl Iterator<Item=&'a B> + 'a {
243            self.huffman.decode(self.bytes.iter().cloned())
244        }
245        pub fn new(huffman: &'a Huffman<B>, bytes: &'a [u8]) -> Self {
246            Self { huffman, bytes }
247        }
248    }
249
250    impl<'a, B: Ord> Copy for Encoded<'a, B> { }
251    impl<'a, B: Ord> Clone for Encoded<'a, B> {
252        fn clone(&self) -> Self { *self }
253    }
254}
255
256mod huffman {
257
258    use std::collections::BTreeMap;
259    use std::convert::TryInto;
260    
261    use self::decoder::Decoder;
262    use self::encoder::Encoder;
263
264    /// Encoding and decoding state for Huffman codes.
265    pub struct Huffman<T: Ord> {
266        /// byte indexed description of what to blat down for encoding.
267        /// An entry `(bits, code)` indicates that the low `bits` of `code` should be blatted down.
268        /// Probably every `code` fits in a `u64`, unless there are crazy frequencies?
269        encode: BTreeMap<T, (usize, u64)>,
270        /// Byte-by-byte decoder.
271        decode: [Decode<T>; 256],
272    }
273    impl<T: Ord> Huffman<T> {
274
275        /// Encodes the provided symbols as a sequence of bytes.
276        ///
277        /// The last byte may only contain partial information, but it should be recorded as presented,
278        /// as we haven't a way to distinguish (e.g. a `Result` return type).
279        pub fn encode<'a, I>(&'a self, symbols: I) -> Encoder<'a, T, I::IntoIter>
280        where
281            I: IntoIterator<Item = &'a T>,
282        {
283            Encoder::new(&self.encode, symbols.into_iter())
284        }
285
286        /// Decodes the provided bytes as a sequence of symbols.
287        pub fn decode<I>(&self, bytes: I) -> Decoder<'_, T, I::IntoIter> 
288        where
289            I: IntoIterator<Item=u8>
290        {
291            Decoder::new(&self.decode, bytes.into_iter())
292        }
293
294        pub fn create_from(counts: BTreeMap<T, i64>) -> Self where T: Clone {
295
296            if counts.is_empty() {
297                return Self {
298                    encode: Default::default(),
299                    decode: Decode::map(),
300                };
301            }
302
303            let mut heap = std::collections::BinaryHeap::new();
304            for (item, count) in counts {
305                heap.push((-count, Node::Leaf(item)));
306            }
307            let mut tree = Vec::with_capacity(2 * heap.len() - 1);
308            while heap.len() > 1 {
309                let (count1, least1) = heap.pop().unwrap();
310                let (count2, least2) = heap.pop().unwrap();
311                let fork = Node::Fork(tree.len(), tree.len()+1);
312                tree.push(least1);
313                tree.push(least2);
314                heap.push((count1 + count2, fork));
315            }
316            tree.push(heap.pop().unwrap().1);
317
318            let mut levels = Vec::with_capacity(1 + tree.len()/2);
319            let mut todo = vec![(tree.last().unwrap(), 0)];
320            while let Some((node, level)) = todo.pop() {
321                match node {
322                    Node::Leaf(sym) => { levels.push((level, sym)); },
323                    Node::Fork(l,r) => { 
324                        todo.push((&tree[*l], level + 1));
325                        todo.push((&tree[*r], level + 1));
326                    },
327                }
328            }
329            levels.sort_by(|x,y| x.0.cmp(&y.0));
330            let mut code: u64 = 0;
331            let mut prev_level = 0;
332            let mut encode = BTreeMap::new();
333            let mut decode = Decode::map();
334            for (level, sym) in levels {
335                if prev_level != level {
336                    code <<= level - prev_level;
337                    prev_level = level;
338                }
339                encode.insert(sym.clone(), (level, code));
340                Self::insert_decode(&mut decode, sym, level, code << (64-level));
341
342                code += 1;
343            }
344
345            for (index, entry) in decode.iter().enumerate() {
346                if entry.any_void() {
347                    panic!("VOID FOUND: {:?}", index);
348                }
349            }
350
351            Huffman { 
352                encode,
353                decode,
354            }
355        }
356
357        /// Inserts a symbol, and 
358        fn insert_decode(map: &mut [Decode<T>; 256], symbol: &T, bits: usize, code: u64) where T: Clone {
359            let byte: u8 = (code >> 56).try_into().unwrap();
360            if bits <= 8 {
361                for off in 0 .. (1 << (8 - bits)) {
362                    map[(byte as usize) + off] = Decode::Symbol(symbol.clone(), bits);
363                }
364            }
365            else {
366                if let Decode::Void = &map[byte as usize] {
367                    map[byte as usize] = Decode::Further(Box::new(Decode::map()));
368                }
369                if let Decode::Further(next_map) = &mut map[byte as usize] {
370                    Self::insert_decode(next_map, symbol, bits - 8, code << 8);
371                }
372            }
373        }
374    }
375    /// Tree structure for Huffman bit length determination.
376    #[derive(Eq, PartialEq, Ord, PartialOrd, Debug)]
377    enum Node<T> {
378        Leaf(T),
379        Fork(usize, usize),
380    }
381
382    /// Decoder 
383    #[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Default)]
384    pub enum Decode<T> {
385        /// An as-yet unfilled slot.
386        #[default]
387        Void,
388        /// The symbol, and the number of bits consumed.
389        Symbol(T, usize),
390        /// An additional map to push subsequent bytes at.
391        Further(Box<[Decode<T>; 256]>),
392    }
393
394    impl<T> Decode<T> {
395        /// Tests to see if the map contains any invalid values.
396        ///
397        /// A correctly initialized map will have no invalid values.
398        /// A map with invalid values will be unable to decode some 
399        /// input byte sequences.
400        fn any_void(&self) -> bool {
401            match self {
402                Decode::Void => true,
403                Decode::Symbol(_,_) => false,
404                Decode::Further(map) => map.iter().any(|m| m.any_void()),
405            }
406        }
407        /// Creates a new map containing invalid values.
408        fn map() -> [Decode<T>; 256] {
409            let mut vec = Vec::with_capacity(256);
410            for _ in 0 .. 256 {
411                vec.push(Decode::Void);
412            }
413            vec.try_into().ok().unwrap()
414        }
415    }
416
417
418    /// A tabled Huffman decoder, written as an iterator.
419    mod decoder {
420
421        use super::Decode;
422
423        #[derive(Copy, Clone)]
424        pub struct Decoder<'a, T, I> {
425            decode: &'a [Decode<T>; 256],
426            bytes: I,
427            pending_byte: u16,
428            pending_bits: usize,
429        }
430
431        impl<'a, T, I> Decoder<'a, T, I> {
432            pub fn new(decode: &'a [Decode<T>; 256], bytes: I) -> Self {
433                Self {
434                    decode,
435                    bytes,
436                    pending_byte: 0,
437                    pending_bits: 0,
438                }
439            }
440        }
441
442        impl<'a, T, I> Iterator for Decoder<'a, T, I>
443        where
444            I: Iterator<Item=u8>,
445        {
446            type Item = &'a T;
447            fn next(&mut self) -> Option<&'a T> {
448                // We must navigate `self.decode`, restocking bits whenever possible.
449                // We stop if ever there are not enough bits remaining.
450                let mut map = self.decode;
451                loop {
452                    if self.pending_bits < 8 {
453                        if let Some(next_byte) = self.bytes.next() {
454                            self.pending_byte = (self.pending_byte << 8) + next_byte as u16;
455                            self.pending_bits += 8;
456                        }
457                        else {
458                            return None;
459                        }
460                    }
461                    let byte = (self.pending_byte >> (self.pending_bits - 8)) as usize;
462                    match &map[byte] {
463                        Decode::Void => { panic!("invalid decoding map"); }
464                        Decode::Symbol(s, bits) => {
465                            self.pending_bits -= bits;
466                            self.pending_byte &= (1 << self.pending_bits) - 1;
467                            return Some(s);
468                        }
469                        Decode::Further(next_map) => {
470                            self.pending_bits -= 8;
471                            self.pending_byte &= (1 << self.pending_bits) - 1;
472                            map = next_map;
473                        }
474                    }
475                }
476            }
477        }
478    }
479
480    /// A tabled Huffman encoder, written as an iterator.
481    mod encoder {
482
483        use std::collections::BTreeMap;
484
485        #[derive(Copy, Clone)]
486        pub struct Encoder<'a, T, I> {
487            encode: &'a BTreeMap<T, (usize, u64)>,
488            symbols: I,
489            pending_byte: u64,
490            pending_bits: usize,
491        }
492
493        impl<'a, T, I> Encoder<'a, T, I> {
494            pub fn new(encode: &'a BTreeMap<T, (usize, u64)>, symbols: I) -> Self {
495                Self {
496                    encode,
497                    symbols,
498                    pending_byte: 0,
499                    pending_bits: 0,
500                }
501            }
502        }
503
504        impl<'a, T: Ord, I> Iterator for Encoder<'a, T, I>
505        where
506            I: Iterator<Item=&'a T>,
507        {
508            type Item = u8;
509            fn next(&mut self) -> Option<u8> {
510                // We repeatedly ship bytes out of `self.pending_byte`, restocking from `self.symbols`.
511                while self.pending_bits < 8 {
512                    if let Some(symbol) = self.symbols.next() {
513                        let (bits, code) = self.encode.get(symbol).unwrap();
514                        self.pending_byte <<= bits;
515                        self.pending_byte += code;
516                        self.pending_bits += bits;
517                    }
518                    else {
519                        // We have run out of symbols. Perhaps there is a final fractional byte to ship?
520                        if self.pending_bits > 0 {
521                            let byte = self.pending_byte << (8 - self.pending_bits);
522                            self.pending_bits = 0;
523                            self.pending_byte = 0;
524                            return Some(byte as u8);
525                        }
526                        else {
527                            return None;
528                        }
529                    }
530                }
531
532                let byte = self.pending_byte >> (self.pending_bits - 8);
533                self.pending_bits -= 8;
534                self.pending_byte &= (1 << self.pending_bits) - 1;
535                Some(byte as u8)
536            }
537        }
538    }
539
540}