compression_codecs/lz4/
encoder.rs

1use crate::{lz4::params::EncoderParams, EncodeV2};
2use compression_core::{
3    unshared::Unshared,
4    util::{PartialBuffer, WriteBuffer},
5};
6use lz4::liblz4::{
7    check_error, LZ4FCompressionContext, LZ4FPreferences, LZ4F_compressBegin, LZ4F_compressBound,
8    LZ4F_compressEnd, LZ4F_compressUpdate, LZ4F_createCompressionContext, LZ4F_flush,
9    LZ4F_freeCompressionContext, LZ4F_VERSION,
10};
11use std::io::{self, Result};
12
13// https://github.com/lz4/lz4/blob/9d53d8bb6c4120345a0966e5d8b16d7def1f32c5/lib/lz4frame.h#L281
14const LZ4F_HEADER_SIZE_MAX: usize = 19;
15
16#[derive(Debug)]
17struct EncoderContext {
18    ctx: LZ4FCompressionContext,
19}
20
21#[derive(Clone, Copy, Debug)]
22enum State {
23    Header,
24    Encoding,
25    Footer,
26    Done,
27}
28
29enum Lz4Fn<'a, 'b> {
30    Begin,
31    Update {
32        input: &'a mut PartialBuffer<&'b [u8]>,
33    },
34    Flush,
35    End,
36}
37
38#[derive(Debug)]
39pub struct Lz4Encoder {
40    ctx: Unshared<EncoderContext>,
41    state: State,
42    preferences: LZ4FPreferences,
43    limit: usize,
44    maybe_buffer: Option<PartialBuffer<Vec<u8>>>,
45    /// Minimum dst buffer size for a block
46    block_buffer_size: usize,
47    /// Minimum dst buffer size for flush/end
48    flush_buffer_size: usize,
49}
50
51// minimum size of destination buffer for compressing `src_size` bytes
52fn min_dst_size(src_size: usize, preferences: &LZ4FPreferences) -> usize {
53    unsafe { LZ4F_compressBound(src_size, preferences) }
54}
55
56impl EncoderContext {
57    fn new() -> Result<Self> {
58        let mut context = LZ4FCompressionContext(core::ptr::null_mut());
59        check_error(unsafe { LZ4F_createCompressionContext(&mut context, LZ4F_VERSION) })?;
60        Ok(Self { ctx: context })
61    }
62}
63
64impl Drop for EncoderContext {
65    fn drop(&mut self) {
66        unsafe { LZ4F_freeCompressionContext(self.ctx) };
67    }
68}
69
70impl Lz4Encoder {
71    pub fn new(params: EncoderParams) -> Self {
72        let preferences = LZ4FPreferences::from(params);
73        let block_size = preferences.frame_info.block_size_id.get_size();
74
75        let block_buffer_size = min_dst_size(block_size, &preferences);
76        let flush_buffer_size = min_dst_size(0, &preferences);
77
78        Self {
79            ctx: Unshared::new(EncoderContext::new().unwrap()),
80            state: State::Header,
81            preferences,
82            limit: block_size,
83            maybe_buffer: None,
84            block_buffer_size,
85            flush_buffer_size,
86        }
87    }
88
89    pub fn buffer_size(&self) -> usize {
90        self.block_buffer_size
91    }
92
93    fn drain_buffer(&mut self, output: &mut WriteBuffer<'_>) -> (usize, usize) {
94        match self.maybe_buffer.as_mut() {
95            Some(buffer) => {
96                let drained_bytes = output.copy_unwritten_from(buffer);
97                (drained_bytes, buffer.unwritten().len())
98            }
99            None => (0, 0),
100        }
101    }
102
103    fn write(&mut self, lz4_fn: Lz4Fn<'_, '_>, output: &mut WriteBuffer<'_>) -> Result<usize> {
104        let (drained_before, undrained) = self.drain_buffer(output);
105        if undrained > 0 || output.has_no_spare_space() {
106            return Ok(drained_before);
107        }
108
109        let mut src_size = 0;
110
111        let min_dst_size = match &lz4_fn {
112            Lz4Fn::Begin => LZ4F_HEADER_SIZE_MAX,
113            Lz4Fn::Update { input } => {
114                src_size = input.unwritten().len().min(self.limit);
115                min_dst_size(src_size, &self.preferences)
116            }
117            Lz4Fn::Flush | Lz4Fn::End => self.flush_buffer_size,
118        };
119
120        // Safety: We **trust** lz4 to not write uninitialized bytes
121        let out_buf = unsafe { output.unwritten_mut() };
122        let output_len = out_buf.len();
123
124        let (dst_buffer, dst_size, maybe_internal_buffer) = if min_dst_size > output_len {
125            let buffer_size = self.block_buffer_size;
126            let buffer = self
127                .maybe_buffer
128                .get_or_insert_with(|| PartialBuffer::new(Vec::with_capacity(buffer_size)));
129            buffer.reset();
130            buffer.get_mut().clear();
131            (
132                buffer.get_mut().spare_capacity_mut().as_mut_ptr(),
133                buffer_size,
134                Some(buffer),
135            )
136        } else {
137            (out_buf.as_mut_ptr(), output_len, None)
138        };
139        let dst_buffer = dst_buffer as *mut u8;
140
141        let len = match lz4_fn {
142            Lz4Fn::Begin => {
143                let len = check_error(unsafe {
144                    LZ4F_compressBegin(
145                        self.ctx.get_mut().ctx,
146                        dst_buffer,
147                        dst_size,
148                        &self.preferences,
149                    )
150                })?;
151                self.state = State::Encoding;
152                len
153            }
154            Lz4Fn::Update { input } => {
155                let len = check_error(unsafe {
156                    LZ4F_compressUpdate(
157                        self.ctx.get_mut().ctx,
158                        dst_buffer,
159                        dst_size,
160                        input.unwritten().as_ptr(),
161                        src_size,
162                        core::ptr::null(),
163                    )
164                })?;
165                input.advance(src_size);
166                len
167            }
168            Lz4Fn::Flush => check_error(unsafe {
169                LZ4F_flush(
170                    self.ctx.get_mut().ctx,
171                    dst_buffer,
172                    dst_size,
173                    core::ptr::null(),
174                )
175            })?,
176            Lz4Fn::End => {
177                let len = check_error(unsafe {
178                    LZ4F_compressEnd(
179                        self.ctx.get_mut().ctx,
180                        dst_buffer,
181                        dst_size,
182                        core::ptr::null(),
183                    )
184                })?;
185                self.state = State::Footer;
186                len
187            }
188        };
189
190        let drained_after = if let Some(internal_buffer) = maybe_internal_buffer {
191            // Safety: We **trust** lz4 to properly write data into the buffer
192            unsafe {
193                internal_buffer.get_mut().set_len(len);
194            }
195            let (d, _) = self.drain_buffer(output);
196            d
197        } else {
198            // Safety: We **trust** lz4 to properly write data into the buffer
199            unsafe { output.assume_init_and_advance(len) };
200            len
201        };
202
203        Ok(drained_before + drained_after)
204    }
205}
206
207impl EncodeV2 for Lz4Encoder {
208    fn encode(
209        &mut self,
210        input: &mut PartialBuffer<&[u8]>,
211        output: &mut WriteBuffer<'_>,
212    ) -> Result<()> {
213        loop {
214            match self.state {
215                State::Header => {
216                    self.write(Lz4Fn::Begin, output)?;
217                }
218
219                State::Encoding => {
220                    self.write(Lz4Fn::Update { input }, output)?;
221                }
222
223                State::Footer | State::Done => {
224                    return Err(io::Error::other("encode after complete"));
225                }
226            }
227
228            if input.unwritten().is_empty() || output.has_no_spare_space() {
229                return Ok(());
230            }
231        }
232    }
233
234    fn flush(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
235        loop {
236            let done = match self.state {
237                State::Header => {
238                    self.write(Lz4Fn::Begin, output)?;
239                    false
240                }
241
242                State::Encoding => {
243                    let len = self.write(Lz4Fn::Flush, output)?;
244                    len == 0
245                }
246
247                State::Footer => {
248                    let (_, undrained) = self.drain_buffer(output);
249                    if undrained == 0 {
250                        self.state = State::Done;
251                        true
252                    } else {
253                        false
254                    }
255                }
256
257                State::Done => true,
258            };
259
260            if done {
261                return Ok(true);
262            }
263
264            if output.has_no_spare_space() {
265                return Ok(false);
266            }
267        }
268    }
269
270    fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
271        loop {
272            match self.state {
273                State::Header => {
274                    self.write(Lz4Fn::Begin, output)?;
275                }
276
277                State::Encoding => {
278                    self.write(Lz4Fn::End, output)?;
279                }
280
281                State::Footer => {
282                    let (_, undrained) = self.drain_buffer(output);
283                    if undrained == 0 {
284                        self.state = State::Done;
285                    }
286                }
287
288                State::Done => {}
289            }
290
291            if let State::Done = self.state {
292                return Ok(true);
293            }
294
295            if output.has_no_spare_space() {
296                return Ok(false);
297            }
298        }
299    }
300}