differential_dataflow/trace/implementations/
huffman_container.rs

1//! A slice container that Huffman encodes its contents.
2
3use std::collections::BTreeMap;
4
5use crate::trace::implementations::{BatchContainer, OffsetList};
6
7use self::wrapper::Wrapped;
8use self::encoded::Encoded;
9use self::huffman::Huffman;
10
11/// A container that contains slices `[B]` as items.
12pub struct HuffmanContainer<B: Ord+Clone> {
13    /// Either encoded data or raw data.
14    inner: Result<(Huffman<B>, Vec<u8>), Vec<B>>,
15    /// Offsets that bound each contained slice.
16    ///
17    /// The length will be one greater than the number of contained items.
18    offsets: OffsetList,
19    /// Counts of the number of each pattern we've seen.
20    stats: BTreeMap<B, i64>
21}
22
23impl<B> HuffmanContainer<B>
24where
25    B: Ord + Clone,
26{
27    /// Prints statistics about encoded containers.
28    pub fn print(&self) {
29        if let Ok((_huff, bytes)) = &self.inner {
30            println!("Bytes: {:?}, Symbols: {:?}", bytes.len(), self.stats.values().sum::<i64>());
31        }
32    }
33}
34
35impl<B> BatchContainer for HuffmanContainer<B>
36where
37    B: Ord + Clone + Sized + 'static,
38{
39    type PushItem = Vec<B>;
40    type ReadItem<'a> = Wrapped<'a, B>;
41    fn push(&mut self, item: Vec<B>) {
42        for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; }
43        match &mut self.inner {
44            Ok((huffman, bytes)) => {
45                bytes.extend(huffman.encode(item.iter()));
46                self.offsets.push(bytes.len());
47            },
48            Err(raw) => { 
49                raw.extend(item); 
50                self.offsets.push(raw.len());
51            }
52        }
53    }
54    fn copy_push(&mut self, item: &Vec<B>) {
55        use crate::trace::MyTrait;
56        self.copy(<_ as MyTrait>::borrow_as(item));
57    }
58    fn copy(&mut self, item: Self::ReadItem<'_>) {
59        match item.decode() {
60            Ok(decoded) => {
61                for x in decoded { *self.stats.entry(x.clone()).or_insert(0) += 1; }
62
63            },
64            Err(symbols) => {
65                for x in symbols.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; }
66            }
67        }
68        match (item.decode(), &mut self.inner) {
69            (Ok(decoded), Ok((huffman, bytes))) => {
70                bytes.extend(huffman.encode(decoded));
71                self.offsets.push(bytes.len());
72            }
73            (Ok(decoded), Err(raw)) => {
74                raw.extend(decoded.cloned());
75                self.offsets.push(raw.len());
76            }
77            (Err(symbols), Ok((huffman, bytes))) => { 
78                bytes.extend(huffman.encode(symbols.iter()));
79                self.offsets.push(bytes.len());
80            }
81            (Err(symbols), Err(raw)) => { 
82                raw.extend(symbols.iter().cloned());
83                self.offsets.push(raw.len());
84            }
85        }
86    }
87    fn copy_slice(&mut self, slice: &[Vec<B>]) {
88        for item in slice {
89            self.copy_push(item);
90        }
91    }
92    fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
93        for index in start .. end {
94            self.copy(other.index(index));
95        }
96    }
97    fn with_capacity(size: usize) -> Self {
98        let mut offsets = OffsetList::with_capacity(size + 1);
99        offsets.push(0);
100        Self {
101            inner: Err(Vec::with_capacity(size)),
102            offsets,
103            stats: Default::default(),
104        }
105    }
106    fn reserve(&mut self, _additional: usize) {
107    }
108    fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
109
110        if cont1.len() > 0 { cont1.print(); }
111        if cont2.len() > 0 { cont2.print(); }
112
113        let mut counts = BTreeMap::default();
114        for (symbol, count) in cont1.stats.iter() {
115            *counts.entry(symbol.clone()).or_insert(0) += count;
116        }
117        for (symbol, count) in cont2.stats.iter() {
118            *counts.entry(symbol.clone()).or_insert(0) += count;
119        }
120
121        let bytes = Vec::with_capacity(counts.values().cloned().sum::<i64>() as usize);
122        let huffman = Huffman::create_from(counts);
123        let inner = Ok((huffman, bytes));
124        // : Err(Vec::with_capacity(length))
125
126        let length = cont1.offsets.len() + cont2.offsets.len() - 2;
127        let mut offsets = OffsetList::with_capacity(length + 1);
128        offsets.push(0);
129        Self {
130            inner,
131            offsets,
132            stats: Default::default(),
133        }
134    }
135    fn index(&self, index: usize) -> Self::ReadItem<'_> {
136        let lower = self.offsets.index(index);
137        let upper = self.offsets.index(index+1);
138        match &self.inner {
139            Ok((huffman, bytes)) => Wrapped::encoded(Encoded::new(huffman, &bytes[lower .. upper])),
140            Err(raw) => Wrapped::decoded(&raw[lower .. upper]),
141        }
142    }
143    fn len(&self) -> usize {
144        self.offsets.len() - 1
145    }
146}
147/// Default implementation introduces a first offset.
148impl<B: Ord+Clone> Default for HuffmanContainer<B> {
149    fn default() -> Self {
150        let mut offsets = OffsetList::with_capacity(1);
151        offsets.push(0);
152        Self {
153            inner: Err(Vec::new()),
154            offsets,
155            stats: Default::default(),
156        }
157    }
158}
159
160mod wrapper {
161
162    use crate::trace::MyTrait;
163    use super::Encoded;
164
165    pub struct Wrapped<'a, B: Ord> {
166        inner: Result<Encoded<'a, B>, &'a [B]>,
167    }
168
169    impl<'a, B: Ord> Wrapped<'a, B> {
170        /// Returns either a decoding iterator, or just the bytes themselves.
171        pub fn decode(&'a self) -> Result<impl Iterator<Item=&'a B> + 'a, &'a [B]> {
172            match &self.inner {
173                Ok(encoded) => Ok(encoded.decode()),
174                Err(symbols) => Err(symbols),
175            }
176        }
177        /// A wrapper around an encoded sequence.
178        pub fn encoded(e: Encoded<'a, B>) -> Self { Self { inner: Ok(e) } }
179        /// A wrapper around a decoded sequence.
180        pub fn decoded(d: &'a [B]) -> Self { Self { inner: Err(d) } }
181    }
182
183    impl<'a, B: Ord> Copy for Wrapped<'a, B> { }
184    impl<'a, B: Ord> Clone for Wrapped<'a, B> {
185        fn clone(&self) -> Self { *self }
186    }
187
188    use std::cmp::Ordering;
189    impl<'a, 'b, B: Ord> PartialEq<Wrapped<'a, B>> for Wrapped<'b, B> {
190        fn eq(&self, other: &Wrapped<'a, B>) -> bool {
191            match (self.decode(), other.decode()) {
192                (Ok(decode1), Ok(decode2)) => decode1.eq(decode2),
193                (Ok(decode1), Err(bytes2)) => decode1.eq(bytes2.iter()),
194                (Err(bytes1), Ok(decode2)) => bytes1.iter().eq(decode2),
195                (Err(bytes1), Err(bytes2)) => bytes1.eq(bytes2),
196            }
197        }
198    }
199    impl<'a, B: Ord> Eq for Wrapped<'a, B> { }
200    impl<'a, 'b, B: Ord> PartialOrd<Wrapped<'a, B>> for Wrapped<'b, B> {
201        fn partial_cmp(&self, other: &Wrapped<'a, B>) -> Option<Ordering> {
202            match (self.decode(), other.decode()) {
203                (Ok(decode1), Ok(decode2)) => decode1.partial_cmp(decode2),
204                (Ok(decode1), Err(bytes2)) => decode1.partial_cmp(bytes2.iter()),
205                (Err(bytes1), Ok(decode2)) => bytes1.iter().partial_cmp(decode2),
206                (Err(bytes1), Err(bytes2)) => bytes1.partial_cmp(bytes2),
207            }
208        }
209    }
210    impl<'a, B: Ord> Ord for Wrapped<'a, B> {
211        fn cmp(&self, other: &Self) -> Ordering {
212            self.partial_cmp(other).unwrap()
213        }
214    }
215    impl<'a, B: Ord+Clone> MyTrait<'a> for Wrapped<'a, B> {
216        type Owned = Vec<B>;
217        fn into_owned(self) -> Self::Owned {
218            match self.decode() {
219                Ok(decode) => decode.cloned().collect(),
220                Err(bytes) => bytes.to_vec(),
221            }
222        }
223        fn clone_onto(&self, other: &mut Self::Owned) {
224            other.clear();
225            match self.decode() {
226                Ok(decode) => other.extend(decode.cloned()),
227                Err(bytes) => other.extend_from_slice(bytes),
228            }
229        }
230        fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering {
231            match self.decode() {
232                Ok(decode) => decode.partial_cmp(other.iter()).unwrap(),
233                Err(bytes) => bytes.cmp(&other[..]),
234            }
235        }
236        fn borrow_as(other: &'a Self::Owned) -> Self {
237            Self { inner: Err(&other[..]) }
238        }
239    }
240}
241
242/// Wrapper around a Huffman decoder and byte slices, decodeable to a byte sequence.
243mod encoded {
244
245    use super::Huffman;
246
247    /// Welcome to GATs!
248    pub struct Encoded<'a, B: Ord> {
249        /// Text that decorates the data.
250        huffman: &'a Huffman<B>,
251        /// The data itself.
252        bytes: &'a [u8],
253    }
254
255    impl<'a, B: Ord> Encoded<'a, B> {
256        /// Returns either a decoding iterator, or just the bytes themselves.
257        pub fn decode(&'a self) -> impl Iterator<Item=&'a B> + 'a {
258            self.huffman.decode(self.bytes.iter().cloned())
259        }
260        pub fn new(huffman: &'a Huffman<B>, bytes: &'a [u8]) -> Self {
261            Self { huffman, bytes }
262        }
263    }
264
265    impl<'a, B: Ord> Copy for Encoded<'a, B> { }
266    impl<'a, B: Ord> Clone for Encoded<'a, B> {
267        fn clone(&self) -> Self { *self }
268    }
269}
270
271mod huffman {
272
273    use std::collections::BTreeMap;
274    use std::convert::TryInto;
275    
276    use self::decoder::Decoder;
277    use self::encoder::Encoder;
278
279    /// Encoding and decoding state for Huffman codes.
280    pub struct Huffman<T: Ord> {
281        /// byte indexed description of what to blat down for encoding.
282        /// An entry `(bits, code)` indicates that the low `bits` of `code` should be blatted down.
283        /// Probably every `code` fits in a `u64`, unless there are crazy frequencies?
284        encode: BTreeMap<T, (usize, u64)>,
285        /// Byte-by-byte decoder.
286        decode: [Decode<T>; 256],
287    }
288    impl<T: Ord> Huffman<T> {
289
290        /// Encodes the provided symbols as a sequence of bytes.
291        ///
292        /// The last byte may only contain partial information, but it should be recorded as presented,
293        /// as we haven't a way to distinguish (e.g. a `Result` return type).
294        pub fn encode<'a, I>(&'a self, symbols: I) -> Encoder<'a, T, I::IntoIter>
295        where
296            I: IntoIterator<Item = &'a T>,
297        {
298            Encoder::new(&self.encode, symbols.into_iter())
299        }
300
301        /// Decodes the provided bytes as a sequence of symbols.
302        pub fn decode<I>(&self, bytes: I) -> Decoder<'_, T, I::IntoIter> 
303        where
304            I: IntoIterator<Item=u8>
305        {
306            Decoder::new(&self.decode, bytes.into_iter())
307        }
308
309        pub fn create_from(counts: BTreeMap<T, i64>) -> Self where T: Clone {
310
311            if counts.is_empty() {
312                return Self {
313                    encode: Default::default(),
314                    decode: Decode::map(),
315                };
316            }
317
318            let mut heap = std::collections::BinaryHeap::new();
319            for (item, count) in counts {
320                heap.push((-count, Node::Leaf(item)));
321            }
322            let mut tree = Vec::with_capacity(2 * heap.len() - 1);
323            while heap.len() > 1 {
324                let (count1, least1) = heap.pop().unwrap();
325                let (count2, least2) = heap.pop().unwrap();
326                let fork = Node::Fork(tree.len(), tree.len()+1);
327                tree.push(least1);
328                tree.push(least2);
329                heap.push((count1 + count2, fork));
330            }
331            tree.push(heap.pop().unwrap().1);
332
333            let mut levels = Vec::with_capacity(1 + tree.len()/2);
334            let mut todo = vec![(tree.last().unwrap(), 0)];
335            while let Some((node, level)) = todo.pop() {
336                match node {
337                    Node::Leaf(sym) => { levels.push((level, sym)); },
338                    Node::Fork(l,r) => { 
339                        todo.push((&tree[*l], level + 1));
340                        todo.push((&tree[*r], level + 1));
341                    },
342                }
343            }
344            levels.sort_by(|x,y| x.0.cmp(&y.0));
345            let mut code: u64 = 0;
346            let mut prev_level = 0;
347            let mut encode = BTreeMap::new();
348            let mut decode = Decode::map();
349            for (level, sym) in levels {
350                if prev_level != level {
351                    code <<= level - prev_level;
352                    prev_level = level;
353                }
354                encode.insert(sym.clone(), (level, code));
355                Self::insert_decode(&mut decode, sym, level, code << (64-level));
356
357                code += 1;
358            }
359
360            for (index, entry) in decode.iter().enumerate() {
361                if entry.any_void() {
362                    panic!("VOID FOUND: {:?}", index);
363                }
364            }
365
366            Huffman { 
367                encode,
368                decode,
369            }
370        }
371
372        /// Inserts a symbol, and 
373        fn insert_decode(map: &mut [Decode<T>; 256], symbol: &T, bits: usize, code: u64) where T: Clone {
374            let byte: u8 = (code >> 56).try_into().unwrap();
375            if bits <= 8 {
376                for off in 0 .. (1 << (8 - bits)) {
377                    map[(byte as usize) + off] = Decode::Symbol(symbol.clone(), bits);
378                }
379            }
380            else {
381                if let Decode::Void = &map[byte as usize] {
382                    map[byte as usize] = Decode::Further(Box::new(Decode::map()));
383                }
384                if let Decode::Further(next_map) = &mut map[byte as usize] {
385                    Self::insert_decode(next_map, symbol, bits - 8, code << 8);
386                }
387            }
388        }
389    }
390    /// Tree structure for Huffman bit length determination.
391    #[derive(Eq, PartialEq, Ord, PartialOrd, Debug)]
392    enum Node<T> {
393        Leaf(T),
394        Fork(usize, usize),
395    }
396
397    /// Decoder 
398    #[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Default)]
399    pub enum Decode<T> {
400        /// An as-yet unfilled slot.
401        #[default]
402        Void,
403        /// The symbol, and the number of bits consumed.
404        Symbol(T, usize),
405        /// An additional map to push subsequent bytes at.
406        Further(Box<[Decode<T>; 256]>),
407    }
408
409    impl<T> Decode<T> {
410        /// Tests to see if the map contains any invalid values.
411        ///
412        /// A correctly initialized map will have no invalid values.
413        /// A map with invalid values will be unable to decode some 
414        /// input byte sequences.
415        fn any_void(&self) -> bool {
416            match self {
417                Decode::Void => true,
418                Decode::Symbol(_,_) => false,
419                Decode::Further(map) => map.iter().any(|m| m.any_void()),
420            }
421        }
422        /// Creates a new map containing invalid values.
423        fn map() -> [Decode<T>; 256] {
424            let mut vec = Vec::with_capacity(256);
425            for _ in 0 .. 256 {
426                vec.push(Decode::Void);
427            }
428            vec.try_into().ok().unwrap()
429        }
430    }
431
432
433    /// A tabled Huffman decoder, written as an iterator.
434    mod decoder {
435
436        use super::Decode;
437
438        #[derive(Copy, Clone)]
439        pub struct Decoder<'a, T, I> {
440            decode: &'a [Decode<T>; 256],
441            bytes: I,
442            pending_byte: u16,
443            pending_bits: usize,
444        }
445
446        impl<'a, T, I> Decoder<'a, T, I> {
447            pub fn new(decode: &'a [Decode<T>; 256], bytes: I) -> Self {
448                Self {
449                    decode,
450                    bytes,
451                    pending_byte: 0,
452                    pending_bits: 0,
453                }
454            }
455        }
456
457        impl<'a, T, I> Iterator for Decoder<'a, T, I>
458        where
459            I: Iterator<Item=u8>,
460        {
461            type Item = &'a T;
462            fn next(&mut self) -> Option<&'a T> {
463                // We must navigate `self.decode`, restocking bits whenever possible.
464                // We stop if ever there are not enough bits remaining.
465                let mut map = self.decode;
466                loop {
467                    if self.pending_bits < 8 {
468                        if let Some(next_byte) = self.bytes.next() {
469                            self.pending_byte = (self.pending_byte << 8) + next_byte as u16;
470                            self.pending_bits += 8;
471                        }
472                        else {
473                            return None;
474                        }
475                    }
476                    let byte = (self.pending_byte >> (self.pending_bits - 8)) as usize;
477                    match &map[byte] {
478                        Decode::Void => { panic!("invalid decoding map"); }
479                        Decode::Symbol(s, bits) => {
480                            self.pending_bits -= bits;
481                            self.pending_byte &= (1 << self.pending_bits) - 1;
482                            return Some(s);
483                        }
484                        Decode::Further(next_map) => {
485                            self.pending_bits -= 8;
486                            self.pending_byte &= (1 << self.pending_bits) - 1;
487                            map = next_map;
488                        }
489                    }
490                }
491            }
492        }
493    }
494
495    /// A tabled Huffman encoder, written as an iterator.
496    mod encoder {
497
498        use std::collections::BTreeMap;
499
500        #[derive(Copy, Clone)]
501        pub struct Encoder<'a, T, I> {
502            encode: &'a BTreeMap<T, (usize, u64)>,
503            symbols: I,
504            pending_byte: u64,
505            pending_bits: usize,
506        }
507
508        impl<'a, T, I> Encoder<'a, T, I> {
509            pub fn new(encode: &'a BTreeMap<T, (usize, u64)>, symbols: I) -> Self {
510                Self {
511                    encode,
512                    symbols,
513                    pending_byte: 0,
514                    pending_bits: 0,
515                }
516            }
517        }
518
519        impl<'a, T: Ord, I> Iterator for Encoder<'a, T, I>
520        where
521            I: Iterator<Item=&'a T>,
522        {
523            type Item = u8;
524            fn next(&mut self) -> Option<u8> {
525                // We repeatedly ship bytes out of `self.pending_byte`, restocking from `self.symbols`.
526                while self.pending_bits < 8 {
527                    if let Some(symbol) = self.symbols.next() {
528                        let (bits, code) = self.encode.get(symbol).unwrap();
529                        self.pending_byte <<= bits;
530                        self.pending_byte += code;
531                        self.pending_bits += bits;
532                    }
533                    else {
534                        // We have run out of symbols. Perhaps there is a final fractional byte to ship?
535                        if self.pending_bits > 0 {
536                            let byte = self.pending_byte << (8 - self.pending_bits);
537                            self.pending_bits = 0;
538                            self.pending_byte = 0;
539                            return Some(byte as u8);
540                        }
541                        else {
542                            return None;
543                        }
544                    }
545                }
546
547                let byte = self.pending_byte >> (self.pending_bits - 8);
548                self.pending_bits -= 8;
549                self.pending_byte &= (1 << self.pending_bits) - 1;
550                Some(byte as u8)
551            }
552        }
553    }
554
555}