shuffly 0.1.3

Increases compressability of data with fixed-sized records.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
//! [<img alt="build" src="https://img.shields.io/github/workflow/status/VeaaC/shuffly/Shuffly%20CI/main?style=for-the-badge">](https://github.com/Veaac/shuffly/actions?query=branch%3Amain)
//! [<img alt="docs" src="https://img.shields.io/docsrs/shuffly?style=for-the-badge">](https://crates.io/crates/shuffly)
//! [<img alt="package" src="https://img.shields.io/crates/v/shuffly?style=for-the-badge">](https://docs.rs/shuffly)
//!
//! Increases compressability of data with fixed-sized records.
//!
//! `Shuffly` detects fixed-sized data patterns by trying out different
//! pattern sized between (by default 1 to 64 bytes). For each pattern it reorders bytes
//! such that byte X of each record is grouped together, and stores deltas of
//! these bytes instead of the original data.
//!
//! The resulting data stream is much more compressible for pattern based
//! compression algorithms like deflate (gz, zip, etc), zstd, or lzma.
//!
//! Shuffly is available both as a command line app (e.g. `cargo install shuffly`),
//! and a library.
//!
//! Library usage
//! ```no_run
//! # let input = "foo";
//! # let output = "bar";
//! # let encode = true;
//! # use std::fs::File;
//! let input: shuffly::Input = Box::new(File::open(input).unwrap());
//! let output: shuffly::Output = Box::new(File::create(output).unwrap());
//! let options = shuffly::Options::new();
//! if encode {
//!     shuffly::encode(0, input, output, &shuffly::Options::new()).unwrap();
//! } else {
//!     shuffly::decode(0, input, output).unwrap();
//! }
//! ```

use std::io::prelude::*;

use std::cmp::Reverse;
use std::collections::BTreeMap;
use std::collections::HashMap;

use crossbeam::channel;

use lz4::EncoderBuilder;

const TAG: [u8; 7] = *b"SHUFFLY";
const HEADER_SIZE: usize = TAG.len() + 0_u64.to_be_bytes().len();
const STRIDE_SIZE: usize = 0_u16.to_be_bytes().len();

/// Lists (non-exhaustivly) all possible error scenarios
#[non_exhaustive]
#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("Failed to create threads")]
    ThreadPool,
    #[error("Failed reading input: {0}")]
    Input(std::io::Error),
    #[error("Failed writing output: {0}")]
    Output(std::io::Error),
    #[error("Encoding is not in SHUFFLY format")]
    Encoding,
}

fn parallel_process<Iter, Item, Producer, Data, Consumer>(
    threads: usize,
    iter: Iter,
    produce: Producer,
    mut consume: Consumer,
) -> Result<(), Error>
where
    Iter: Iterator<Item = Item> + Send,
    Item: Send,
    Producer: Fn(Item) -> Data + Sync,
    Data: Send,
    Error: Send,
    Consumer: FnMut(Data) -> Result<(), Error> + Send,
{
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(threads + 2) // we need 2 extra threads for blocking I/O
        .build()
        .map_err(|_| Error::ThreadPool)?;

    let num_tokens = 2 * threads;

    let (token_sender, token_reciver) = channel::bounded(num_tokens);
    let (iter_sender, iter_receiver) = channel::bounded(num_tokens);
    let (data_sender, data_receiver) = channel::bounded(num_tokens);

    pool.scope(|s| {
        s.spawn(|_| {
            for x in iter.enumerate() {
                if token_reciver.recv().is_err() {
                    break;
                }
                if iter_sender.send(x).is_err() {
                    break;
                }
            }
            std::mem::drop(iter_sender);
        });

        for _ in 0..threads {
            let data_sender = data_sender.clone();
            s.spawn(|_| {
                let data_sender = data_sender;
                while let Ok((i, item)) = iter_receiver.recv() {
                    let data = produce(item);
                    if data_sender.send((i, data)).is_err() {
                        break;
                    }
                }
            });
        }
        drop(data_sender); // drop to make sure iteration will finish once all senders are out of scope

        // we need to move these into the scope so they are dropped on failure
        let token_sender = token_sender;
        let data_receiver = data_receiver;
        let mut pending = BTreeMap::new();
        let mut next_idx = 0;
        for _ in 0..num_tokens {
            if token_sender.send(()).is_err() {
                return Err(Error::ThreadPool);
            }
        }
        for result in data_receiver {
            pending.insert(Reverse(result.0), result.1);
            while let Some(data) = pending.remove(&Reverse(next_idx)) {
                if token_sender.send(()).is_err() {
                    return Err(Error::ThreadPool);
                }

                next_idx += 1;
                consume(data)?;
            }
        }
        Ok(())
    })
}

pub type Input = Box<dyn std::io::Read + Send + Sync>;
pub type Output = Box<dyn std::io::Write + Send + Sync>;

/// Read data until we have read a full block or reached the end of the input
fn read_block(
    mut input: Input,
    block_size: usize,
) -> impl FnMut() -> Option<Result<Vec<u8>, Error>> {
    let mut eof = false;
    move || {
        if eof {
            return None;
        }
        let mut buf = vec![0_u8; block_size];
        // read one full block if possible
        let mut block_size = 0;
        while !buf[block_size..].is_empty() {
            let num_read = match input.read(&mut buf[block_size..]) {
                Err(e) => return Some(Err(Error::Input(e))),
                Ok(x) => x,
            };
            if num_read == 0 {
                eof = true;
                break;
            }
            block_size += num_read;
        }

        buf.resize(block_size, 0);
        if buf.is_empty() {
            None
        } else {
            Some(Ok(buf))
        }
    }
}

/// Finds the most possible stride by trying all of them on a subset of the data
fn find_best_stride(buf: &[u8], strides: &[u16]) -> usize {
    let prefix = &buf[..buf.len().min(1024 * 4)];
    strides
        .iter()
        .filter_map(|stride| {
            let shuffled_prefix = shuffle(prefix, *stride as usize);
            compressability(&shuffled_prefix).map(|size| (size, *stride))
        })
        .min()
        .unwrap_or((0, 0))
        .1 as usize
}

fn shuffle(buf: &[u8], stride: usize) -> Vec<u8> {
    let mut out_buf = Vec::with_capacity(buf.len() + STRIDE_SIZE);
    // write block header
    out_buf.extend((stride as u16).to_be_bytes() as [u8; STRIDE_SIZE]);

    if stride == 0 {
        out_buf.extend(buf);
    } else {
        // Transform payload:
        // * reorder bytes so that bytes X of each stride comes before bytes X+1 of any other stride
        // * compute deltas between bytes
        for i in 0..stride {
            let mut previous = 0;
            for x in buf.iter().skip(i).step_by(stride) {
                out_buf.push(x.wrapping_sub(previous));
                previous = *x;
            }
        }
    }
    out_buf
}

fn deshuffle(buf: &[u8]) -> Result<Vec<u8>, Error> {
    let stride = match buf.get(0..STRIDE_SIZE) {
        None => return Err(Error::Encoding),
        Some(x) => u16::from_be_bytes(x.try_into().unwrap()) as usize,
    };

    let buf = &buf[STRIDE_SIZE..];

    if stride == 0 {
        return Ok(buf.into());
    }
    let mut out_buf = vec![0_u8; buf.len()];
    let mut iter = buf.iter();
    for i in 0..stride {
        let mut previous: u8 = 0;
        for pos in out_buf.iter_mut().skip(i).step_by(stride) {
            // we cannot fail here, since out_buf was sized properly
            let x = iter.next().unwrap();
            previous = previous.wrapping_add(*x);
            *pos = previous;
        }
    }

    Ok(out_buf)
}

fn compressability(buf: &[u8]) -> Option<usize> {
    let output = Vec::new();
    let mut encoder = EncoderBuilder::new()
        .level(4)
        .build(output)
        .expect("Invalid compression level");
    if encoder.write_all(buf).is_err() {
        return None;
    }
    match encoder.finish() {
        (output, Ok(_)) => Some(output.len()),
        _ => None,
    }
}

/// Statistics about the encoding process
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct Stats {
    /// Which strides were used how often
    pub strides: Vec<(usize, u64)>,
}

/// Options passed to the encoder
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct Options {
    pub block_size: usize,
    pub strides: Vec<u16>,
}

impl Options {
    pub fn new() -> Self {
        Self {
            block_size: 4 * 1024 * 1024,
            strides: (0..=64).collect(),
        }
    }
}

impl Default for Options {
    fn default() -> Self {
        Self::new()
    }
}

/// Encodes data by searching for fixed-sized correlations in the data and
/// shuffling data around accordingly.
/// At least 3 threads will be used, one for input, one for output, and one for encoding
pub fn encode(
    threads: usize,
    input: Input,
    mut output: Output,
    options: &Options,
) -> Result<Stats, Error> {
    // write header
    output.write_all(b"SHUFFLY").map_err(Error::Output)?;
    output
        .write_all(&(options.block_size as u64).to_be_bytes())
        .map_err(Error::Output)?;
    let mut strides = HashMap::new();
    parallel_process(
        threads,
        std::iter::from_fn(read_block(input, options.block_size)),
        |buf: Result<Vec<u8>, Error>| {
            let buf = buf?;
            let stride = find_best_stride(&buf, &options.strides);
            Ok((stride, shuffle(&buf, stride)))
        },
        |data| {
            let (stride, buf) = data?;
            *strides.entry(stride).or_insert(0) += 1;
            output.write_all(&buf).map_err(Error::Output)
        },
    )?;
    let mut strides: Vec<_> = strides.into_iter().collect();
    strides.sort_unstable();
    Ok(Stats { strides })
}

/// Decoded data that has previously been encoded.
/// At least 3 threads will be used, one for input, one for output, and one for encoding
pub fn decode(threads: usize, mut input: Input, mut output: Output) -> Result<(), Error> {
    // read header
    let mut header: [u8; HEADER_SIZE] = [0; HEADER_SIZE];
    input.read_exact(&mut header).map_err(Error::Input)?;
    if header[0..TAG.len()] != TAG {
        return Err(Error::Encoding);
    }

    // unwrapping here since we cannot fail
    let block_size = u64::from_be_bytes(header[TAG.len()..].try_into().unwrap());
    let block_size: usize = match block_size.try_into() {
        Err(_) => return Err(Error::Encoding),
        Ok(x) => x,
    };
    parallel_process(
        threads,
        std::iter::from_fn(read_block(input, block_size + STRIDE_SIZE)),
        |buf: Result<Vec<u8>, Error>| {
            let buf = buf?;
            deshuffle(&buf)
        },
        |buf| output.write_all(&buf?).map_err(Error::Output),
    )
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::{Arc, Mutex};

    #[derive(Default, Clone)]
    struct CaptureWrite {
        data: Arc<Mutex<Vec<u8>>>,
    }

    impl Write for CaptureWrite {
        fn write(&mut self, value: &[u8]) -> std::result::Result<usize, std::io::Error> {
            self.data.lock().unwrap().write(value)
        }
        fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
            Ok(())
        }
    }

    #[test]
    fn end2end() {
        for size in [0, 5, 7, 13, 64, 128, 1024, 1024 * 1024] {
            let input: Vec<_> = (0_u64..size)
                .map(|i| match i % 5 {
                    0 => i,
                    1 => i * 2 + 399,
                    2 => i * 3 + 300,
                    3 => i * i,
                    _ => i + i * i,
                } as u8)
                .collect();
            println!("Input: {:?}", &input);

            let shuffled = CaptureWrite::default();
            encode(
                2,
                Box::new(std::io::Cursor::new(input.clone())),
                Box::new(shuffled.clone()),
                &Options {
                    block_size: 19,
                    ..Options::new()
                },
            )
            .expect("Failed to encode");
            let shuffled = Arc::try_unwrap(shuffled.data)
                .unwrap()
                .into_inner()
                .unwrap();
            println!("Shuffled: {:?}", &shuffled);

            let deshuffled = CaptureWrite::default();
            decode(
                2,
                Box::new(std::io::Cursor::new(shuffled.clone())),
                Box::new(deshuffled.clone()),
            )
            .expect("Failed to decode");
            let deshuffled = Arc::try_unwrap(deshuffled.data)
                .unwrap()
                .into_inner()
                .unwrap();
            println!("Deshuffled: {:?}", &deshuffled);

            assert!(deshuffled == input);
        }
    }

    #[test]
    fn shuffle_deshuffle() {
        for size in [0, 5, 7, 13, 64, 128, 1024] {
            let input: Vec<_> = (0_u64..size)
                .map(|i| match i % 5 {
                    0 => i,
                    1 => i * 2 + 399,
                    2 => i * 3 + 300,
                    3 => i * i,
                    _ => i + i * i,
                } as u8)
                .collect();

            let shuffled = shuffle(&input, 5);
            let deshuffled = deshuffle(&shuffled).unwrap();

            assert_eq!(
                deshuffled, input,
                "\n{:?} -> {:?} -> {:?}",
                input, shuffled, deshuffled
            );
        }
    }
}