Skip to main content

lz4/
encoder.rs

1//! `Write`-based streaming encoder wrapping the LZ4 frame API
2//! (`LZ4F_compressBegin` / `LZ4F_compressUpdate` / `LZ4F_compressEnd`).
3//!
4//! [`Encoder`] consumes an inner [`Write`] sink and emits a complete `.lz4`
5//! frame: a frame header (written eagerly on `build`), a sequence of
6//! compressed blocks (flushed as the input fills the block-size buffer or on
7//! explicit [`flush`](Encoder::flush)), and a frame footer (written by
8//! [`finish`](Encoder::finish)). [`EncoderBuilder`] configures the frame
9//! preferences (block size, block mode, checksums, compression level, etc.)
10//! before constructing the encoder.
11//!
12//! The API mirrors the `lz4-rs` crate's [`Encoder`] / [`EncoderBuilder`] so
13//! that this crate is a drop-in replacement.
14
15use super::liblz4::*;
16use super::size_t;
17use std::cmp;
18use std::io::Result;
19use std::io::Write;
20use std::ptr;
21
22/// RAII wrapper around an `LZ4F_cctx*` that frees the context on drop.
23#[derive(Debug)]
24struct EncoderContext {
25    c: LZ4FCompressionContext,
26}
27
28/// Builder for [`Encoder`] that collects LZ4 frame preferences before
29/// constructing the streaming encoder.
30///
31/// Preferences correspond to the fields of upstream `LZ4F_preferences_t`
32/// (see `lz4frame.h`). Defaults match the `lz4-rs` crate, not the upstream
33/// `lz4` CLI; for CLI-compatible defaults see `src/bin/lz4.rs`.
34#[derive(Clone, Debug)]
35pub struct EncoderBuilder {
36    block_size: BlockSize,
37    block_mode: BlockMode,
38    // 1: each block followed by a checksum of block's compressed data; 0: disabled (default)
39    block_checksum: BlockChecksum,
40    checksum: ContentChecksum,
41    // 0 == default (fast mode); values above 16 count as 16; values below 0 count as 0
42    level: u32,
43    // 1 == always flush (reduce need for tmp buffer)
44    auto_flush: bool,
45    favor_dec_speed: bool,
46    content_size: u64,
47}
48
49/// Streaming LZ4 frame encoder wrapping an inner [`Write`] sink.
50///
51/// Bytes written to the encoder are buffered up to the configured block
52/// size and then handed to `LZ4F_compressUpdate`, which emits compressed
53/// blocks into `w`. Always call [`finish`](Encoder::finish) to flush the
54/// final block and write the frame footer — dropping the encoder without
55/// calling `finish` produces a truncated frame.
56#[derive(Debug)]
57pub struct Encoder<W> {
58    c: EncoderContext,
59    w: W,
60    limit: usize,
61    auto_flush: bool,
62    input: Vec<u8>,
63    buffer: Vec<u8>,
64}
65
66impl EncoderBuilder {
67    /// Returns a builder pre-populated with the same defaults used by the
68    /// `lz4-rs` crate (linked blocks, content + block checksums enabled,
69    /// fast-mode compression).
70    pub fn new() -> Self {
71        EncoderBuilder {
72            block_size: BlockSize::Default,
73            block_mode: BlockMode::Linked,
74            checksum: ContentChecksum::ChecksumEnabled,
75            block_checksum: BlockChecksum::BlockChecksumEnabled,
76            level: 0,
77            auto_flush: false,
78            favor_dec_speed: false,
79            content_size: 0,
80        }
81    }
82
83    /// Selects the maximum block size (64 KiB, 256 KiB, 1 MiB, or 4 MiB).
84    /// Larger blocks generally improve compression ratio at the cost of
85    /// memory.
86    pub fn block_size(&mut self, block_size: BlockSize) -> &mut Self {
87        self.block_size = block_size;
88        self
89    }
90
91    /// Sets whether blocks may reference data from previous blocks
92    /// ([`BlockMode::Linked`]) or must each compress independently
93    /// ([`BlockMode::Independent`]).
94    pub fn block_mode(&mut self, block_mode: BlockMode) -> &mut Self {
95        self.block_mode = block_mode;
96        self
97    }
98
99    /// Enables or disables the per-block xxHash32 trailer that follows each
100    /// compressed block.
101    pub fn block_checksum(&mut self, block_checksum: BlockChecksum) -> &mut Self {
102        self.block_checksum = block_checksum;
103        self
104    }
105
106    /// Enables or disables the xxHash32 checksum of the full decompressed
107    /// content that is appended to the frame footer.
108    pub fn checksum(&mut self, checksum: ContentChecksum) -> &mut Self {
109        self.checksum = checksum;
110        self
111    }
112
113    /// Sets the compression level. `0` selects fast mode; levels `3..=12`
114    /// route through HC. Levels above 12 are clamped by the underlying
115    /// implementation.
116    pub fn level(&mut self, level: u32) -> &mut Self {
117        self.level = level;
118        self
119    }
120
121    /// When `true`, every [`Encoder::write`] call is flushed to the inner
122    /// writer instead of being buffered, reducing internal memory usage at
123    /// some cost in compression ratio for small writes.
124    pub fn auto_flush(&mut self, auto_flush: bool) -> &mut Self {
125        self.auto_flush = auto_flush;
126        self
127    }
128
129    /// Favor decompression speed over compression ratio. Requires compression
130    /// level >=10.
131    pub fn favor_dec_speed(&mut self, favor_dec_speed: bool) -> &mut Self {
132        self.favor_dec_speed = favor_dec_speed;
133        self
134    }
135
136    /// Records the total uncompressed content size in the frame header.
137    /// Use `0` (the default) when the size is unknown ahead of time.
138    pub fn content_size(&mut self, content_size: u64) -> &mut Self {
139        self.content_size = content_size;
140        self
141    }
142
143    /// Consumes the builder, allocates an LZ4 frame compression context,
144    /// writes the frame header into `w`, and returns the resulting
145    /// streaming encoder.
146    pub fn build<W: Write>(&self, w: W) -> Result<Encoder<W>> {
147        let block_size = self.block_size.get_size();
148        let preferences = LZ4FPreferences {
149            frame_info: LZ4FFrameInfo {
150                block_size_id: self.block_size.clone(),
151                block_mode: self.block_mode.clone(),
152                content_checksum_flag: self.checksum.clone(),
153                content_size: self.content_size,
154                frame_type: FrameType::Frame,
155                dict_id: 0,
156                block_checksum_flag: self.block_checksum.clone(),
157            },
158            compression_level: self.level,
159            auto_flush: if self.auto_flush { 1 } else { 0 },
160            favor_dec_speed: if self.favor_dec_speed { 1 } else { 0 },
161            reserved: [0; 3],
162        };
163        let mut encoder = Encoder {
164            w,
165            c: EncoderContext::new()?,
166            limit: block_size,
167            auto_flush: self.auto_flush,
168            input: Vec::with_capacity(block_size),
169            buffer: Vec::with_capacity(check_error(unsafe {
170                LZ4F_compressBound(block_size as size_t, &preferences)
171            })?),
172        };
173        encoder.write_header(&preferences)?;
174        Ok(encoder)
175    }
176}
177
178impl Default for EncoderBuilder {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184impl<W: Write> Encoder<W> {
185    fn write_header(&mut self, preferences: &LZ4FPreferences) -> Result<()> {
186        unsafe {
187            let len = check_error(LZ4F_compressBegin(
188                self.c.c,
189                self.buffer.as_mut_ptr(),
190                self.buffer.capacity() as size_t,
191                preferences,
192            ))?;
193            self.buffer.set_len(len);
194        }
195        self.w.write_all(&self.buffer)
196    }
197
198    fn write_end(&mut self) -> Result<()> {
199        self.write_pending()?;
200        unsafe {
201            let len = check_error(LZ4F_compressEnd(
202                self.c.c,
203                self.buffer.as_mut_ptr(),
204                self.buffer.capacity() as size_t,
205                ptr::null(),
206            ))?;
207            self.buffer.set_len(len);
208        };
209        self.w.write_all(&self.buffer)
210    }
211
212    /// Immutable writer reference.
213    pub fn writer(&self) -> &W {
214        &self.w
215    }
216
217    fn write_update(&mut self, input: &[u8]) -> Result<()> {
218        unsafe {
219            let len = check_error(LZ4F_compressUpdate(
220                self.c.c,
221                self.buffer.as_mut_ptr(),
222                self.buffer.capacity() as size_t,
223                input.as_ptr(),
224                input.len() as size_t,
225                ptr::null(),
226            ))?;
227            self.buffer.set_len(len);
228        }
229        self.w.write_all(&self.buffer)
230    }
231
232    fn write_pending(&mut self) -> Result<()> {
233        if self.input.is_empty() {
234            return Ok(());
235        }
236        unsafe {
237            let len = check_error(LZ4F_compressUpdate(
238                self.c.c,
239                self.buffer.as_mut_ptr(),
240                self.buffer.capacity() as size_t,
241                self.input.as_ptr(),
242                self.input.len() as size_t,
243                ptr::null(),
244            ))?;
245            self.buffer.set_len(len);
246        }
247        self.input.clear();
248        self.w.write_all(&self.buffer)
249    }
250
251    /// Finalises the LZ4 frame.
252    ///
253    /// Flushes any buffered input, invokes `LZ4F_compressEnd` to write the
254    /// frame footer (and content checksum if enabled), and returns the
255    /// wrapped writer together with the result of writing those final
256    /// bytes. Always call this; dropping the encoder without `finish`
257    /// produces a truncated, undecodable frame.
258    pub fn finish(mut self) -> (W, Result<()>) {
259        let result = self.write_end();
260        (self.w, result)
261    }
262}
263
264impl<W: Write> Write for Encoder<W> {
265    fn write(&mut self, buffer: &[u8]) -> Result<usize> {
266        if self.auto_flush {
267            self.write_update(buffer)?;
268            return Ok(buffer.len());
269        }
270
271        let mut offset = 0;
272        while offset < buffer.len() {
273            let remaining = buffer.len() - offset;
274            if self.input.is_empty() && remaining >= self.limit {
275                let end = offset + self.limit;
276                self.write_update(&buffer[offset..end])?;
277                offset = end;
278                continue;
279            }
280
281            let available = self.limit - self.input.len();
282            let size = cmp::min(remaining, available);
283            self.input.extend_from_slice(&buffer[offset..offset + size]);
284            offset += size;
285            if self.input.len() == self.limit {
286                self.write_pending()?;
287            }
288        }
289        Ok(buffer.len())
290    }
291
292    fn flush(&mut self) -> Result<()> {
293        self.write_pending()?;
294        loop {
295            unsafe {
296                let len = check_error(LZ4F_flush(
297                    self.c.c,
298                    self.buffer.as_mut_ptr(),
299                    self.buffer.capacity() as size_t,
300                    ptr::null(),
301                ))?;
302                if len == 0 {
303                    break;
304                }
305                self.buffer.set_len(len);
306            };
307            self.w.write_all(&self.buffer)?;
308        }
309        self.w.flush()
310    }
311}
312
313impl EncoderContext {
314    fn new() -> Result<EncoderContext> {
315        let mut context = LZ4FCompressionContext(ptr::null_mut());
316        check_error(unsafe { LZ4F_createCompressionContext(&mut context, LZ4F_VERSION) })?;
317        Ok(EncoderContext { c: context })
318    }
319}
320
321impl Drop for EncoderContext {
322    fn drop(&mut self) {
323        unsafe { LZ4F_freeCompressionContext(self.c) };
324    }
325}
326
327#[cfg(test)]
328mod test {
329    use super::EncoderBuilder;
330    use crate::liblz4::{BlockChecksum, ContentChecksum};
331    use std::io::{Read, Write};
332
333    #[test]
334    fn test_encoder_smoke() {
335        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
336        encoder.write_all(b"Some ").unwrap();
337        encoder.write_all(b"data").unwrap();
338        let (_, result) = encoder.finish();
339        result.unwrap();
340    }
341
342    #[test]
343    fn test_encoder_random() {
344        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
345        let mut input = Vec::new();
346        let mut rnd: u32 = 42;
347        for _ in 0..1024 * 1024 {
348            input.push((rnd & 0xFF) as u8);
349            rnd = (1664525_u64 * (rnd as u64) + 1013904223_u64) as u32;
350        }
351        encoder.write_all(&input).unwrap();
352        let (compressed, result) = encoder.finish();
353        result.unwrap();
354
355        let mut dec = crate::decoder::Decoder::new(&compressed[..]).unwrap();
356        let mut output = Vec::new();
357        dec.read_to_end(&mut output).unwrap();
358        assert_eq!(input, output);
359    }
360
361    #[test]
362    fn test_encoder_content_size() {
363        let mut encoder = EncoderBuilder::new()
364            .level(1)
365            .content_size(1024 * 1024)
366            .build(Vec::new())
367            .unwrap();
368        let mut input = Vec::new();
369        let mut rnd: u32 = 42;
370        for _ in 0..1024 * 1024 {
371            input.push((rnd & 0xFF) as u8);
372            rnd = (1664525_u64 * (rnd as u64) + 1013904223_u64) as u32;
373        }
374        encoder.write_all(&input).unwrap();
375        let (compressed, result) = encoder.finish();
376        result.unwrap();
377
378        let mut dec = crate::decoder::Decoder::new(&compressed[..]).unwrap();
379        let mut output = Vec::new();
380        dec.read_to_end(&mut output).unwrap();
381        assert_eq!(input, output);
382    }
383
384    #[test]
385    fn test_encoder_auto_flush_emits_partial_write() {
386        let mut encoder = EncoderBuilder::new()
387            .auto_flush(true)
388            .block_checksum(BlockChecksum::NoBlockChecksum)
389            .checksum(ContentChecksum::NoChecksum)
390            .build(Vec::new())
391            .unwrap();
392        let header_len = encoder.writer().len();
393        encoder.write_all(b"partial").unwrap();
394        assert!(encoder.writer().len() > header_len);
395
396        let (compressed, result) = encoder.finish();
397        result.unwrap();
398        let mut dec = crate::decoder::Decoder::new(&compressed[..]).unwrap();
399        let mut output = Vec::new();
400        dec.read_to_end(&mut output).unwrap();
401        assert_eq!(output, b"partial");
402    }
403
404    #[test]
405    fn test_encoder_send() {
406        fn check_send<S: Send>(_: &S) {}
407        let enc = EncoderBuilder::new().build(Vec::new());
408        check_send(&enc);
409    }
410
411    #[test]
412    fn test_favor_dec_speed() {
413        let mut encoder = EncoderBuilder::new()
414            .level(11)
415            .favor_dec_speed(true)
416            .build(Vec::new())
417            .unwrap();
418        let mut input = Vec::new();
419        let mut rnd: u32 = 42;
420        for _ in 0..1024 * 1024 {
421            input.push((rnd & 0xFF) as u8);
422            rnd = (1664525_u64 * (rnd as u64) + 1013904223_u64) as u32;
423        }
424        encoder.write_all(&input).unwrap();
425        let (compressed, result) = encoder.finish();
426        result.unwrap();
427
428        let mut dec = crate::decoder::Decoder::new(&compressed[..]).unwrap();
429
430        let mut output = Vec::new();
431        dec.read_to_end(&mut output).unwrap();
432        assert_eq!(input, output);
433    }
434}