compression_codecs/lz4/
encoder.rs

1use crate::{lz4::params::EncoderParams, EncodeV2};
2use compression_core::{
3    unshared::Unshared,
4    util::{PartialBuffer, WriteBuffer},
5};
6use lz4::liblz4::{
7    check_error, LZ4FCompressionContext, LZ4FPreferences, LZ4F_compressBegin, LZ4F_compressBound,
8    LZ4F_compressEnd, LZ4F_compressUpdate, LZ4F_createCompressionContext, LZ4F_flush,
9    LZ4F_freeCompressionContext, LZ4F_VERSION,
10};
11use std::io::{self, Result};
12
13// https://github.com/lz4/lz4/blob/9d53d8bb6c4120345a0966e5d8b16d7def1f32c5/lib/lz4frame.h#L281
14const LZ4F_HEADER_SIZE_MAX: usize = 19;
15
16#[derive(Debug)]
17struct EncoderContext {
18    ctx: LZ4FCompressionContext,
19}
20
21#[derive(Clone, Copy, Debug)]
22enum State {
23    Header,
24    Encoding,
25    Footer,
26    Done,
27}
28
29enum Lz4Fn<'a, 'b> {
30    Begin,
31    Update {
32        input: &'a mut PartialBuffer<&'b [u8]>,
33    },
34    Flush,
35    End,
36}
37
38#[derive(Debug)]
39pub struct Lz4Encoder {
40    ctx: Unshared<EncoderContext>,
41    state: State,
42    preferences: LZ4FPreferences,
43    limit: usize,
44    maybe_buffer: Option<PartialBuffer<Vec<u8>>>,
45    /// Minimum dst buffer size for a block
46    block_buffer_size: usize,
47    /// Minimum dst buffer size for flush/end
48    flush_buffer_size: usize,
49}
50
51// minimum size of destination buffer for compressing `src_size` bytes
52fn min_dst_size(src_size: usize, preferences: &LZ4FPreferences) -> usize {
53    unsafe { LZ4F_compressBound(src_size, preferences) }
54}
55
56impl EncoderContext {
57    fn new() -> Result<Self> {
58        let mut context = LZ4FCompressionContext(core::ptr::null_mut());
59        check_error(unsafe { LZ4F_createCompressionContext(&mut context, LZ4F_VERSION) })?;
60        Ok(Self { ctx: context })
61    }
62}
63
64impl Drop for EncoderContext {
65    fn drop(&mut self) {
66        unsafe { LZ4F_freeCompressionContext(self.ctx) };
67    }
68}
69
70impl Lz4Encoder {
71    pub fn new(params: EncoderParams) -> Self {
72        let preferences = LZ4FPreferences::from(params);
73        let block_size = preferences.frame_info.block_size_id.get_size();
74
75        let block_buffer_size = min_dst_size(block_size, &preferences);
76        let flush_buffer_size = min_dst_size(0, &preferences);
77
78        Self {
79            ctx: Unshared::new(EncoderContext::new().unwrap()),
80            state: State::Header,
81            preferences,
82            limit: block_size,
83            maybe_buffer: None,
84            block_buffer_size,
85            flush_buffer_size,
86        }
87    }
88
89    pub fn buffer_size(&self) -> usize {
90        self.block_buffer_size
91    }
92
93    fn drain_buffer(&mut self, output: &mut WriteBuffer<'_>) -> (usize, usize) {
94        match self.maybe_buffer.as_mut() {
95            Some(buffer) => {
96                let drained_bytes = output.copy_unwritten_from(buffer);
97                (drained_bytes, buffer.unwritten().len())
98            }
99            None => (0, 0),
100        }
101    }
102
103    fn write(&mut self, lz4_fn: Lz4Fn<'_, '_>, output: &mut WriteBuffer<'_>) -> Result<usize> {
104        let (drained_before, undrained) = self.drain_buffer(output);
105        if undrained > 0 {
106            return Ok(drained_before);
107        }
108
109        let mut src_size = 0;
110
111        let min_dst_size = match &lz4_fn {
112            Lz4Fn::Begin => LZ4F_HEADER_SIZE_MAX,
113            Lz4Fn::Update { input } => {
114                src_size = input.unwritten().len().min(self.limit);
115                min_dst_size(src_size, &self.preferences)
116            }
117            Lz4Fn::Flush | Lz4Fn::End => self.flush_buffer_size,
118        };
119
120        let out_buf = output.initialize_unwritten();
121        let output_len = out_buf.len();
122
123        let (dst_buffer, dst_size, maybe_internal_buffer) = if min_dst_size > output_len {
124            let buffer_size = self.block_buffer_size;
125            let buffer = self
126                .maybe_buffer
127                .get_or_insert_with(|| PartialBuffer::new(Vec::with_capacity(buffer_size)));
128            buffer.reset();
129            (
130                buffer.unwritten_mut().as_mut_ptr(),
131                buffer_size,
132                Some(buffer),
133            )
134        } else {
135            (out_buf.as_mut_ptr(), output_len, None)
136        };
137
138        let len = match lz4_fn {
139            Lz4Fn::Begin => {
140                let len = check_error(unsafe {
141                    LZ4F_compressBegin(
142                        self.ctx.get_mut().ctx,
143                        dst_buffer,
144                        dst_size,
145                        &self.preferences,
146                    )
147                })?;
148                self.state = State::Encoding;
149                len
150            }
151            Lz4Fn::Update { input } => {
152                let len = check_error(unsafe {
153                    LZ4F_compressUpdate(
154                        self.ctx.get_mut().ctx,
155                        dst_buffer,
156                        dst_size,
157                        input.unwritten().as_ptr(),
158                        src_size,
159                        core::ptr::null(),
160                    )
161                })?;
162                input.advance(src_size);
163                len
164            }
165            Lz4Fn::Flush => check_error(unsafe {
166                LZ4F_flush(
167                    self.ctx.get_mut().ctx,
168                    dst_buffer,
169                    dst_size,
170                    core::ptr::null(),
171                )
172            })?,
173            Lz4Fn::End => {
174                let len = check_error(unsafe {
175                    LZ4F_compressEnd(
176                        self.ctx.get_mut().ctx,
177                        dst_buffer,
178                        dst_size,
179                        core::ptr::null(),
180                    )
181                })?;
182                self.state = State::Footer;
183                len
184            }
185        };
186
187        let drained_after = if let Some(internal_buffer) = maybe_internal_buffer {
188            unsafe {
189                internal_buffer.get_mut().set_len(len);
190            }
191            let (d, _) = self.drain_buffer(output);
192            d
193        } else {
194            output.advance(len);
195            len
196        };
197
198        Ok(drained_before + drained_after)
199    }
200}
201
202impl EncodeV2 for Lz4Encoder {
203    fn encode(
204        &mut self,
205        input: &mut PartialBuffer<&[u8]>,
206        output: &mut WriteBuffer<'_>,
207    ) -> Result<()> {
208        loop {
209            match self.state {
210                State::Header => {
211                    self.write(Lz4Fn::Begin, output)?;
212                }
213
214                State::Encoding => {
215                    self.write(Lz4Fn::Update { input }, output)?;
216                }
217
218                State::Footer | State::Done => {
219                    return Err(io::Error::other("encode after complete"));
220                }
221            }
222
223            if input.unwritten().is_empty() || output.has_no_spare_space() {
224                return Ok(());
225            }
226        }
227    }
228
229    fn flush(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
230        loop {
231            let done = match self.state {
232                State::Header => {
233                    self.write(Lz4Fn::Begin, output)?;
234                    false
235                }
236
237                State::Encoding => {
238                    let len = self.write(Lz4Fn::Flush, output)?;
239                    len == 0
240                }
241
242                State::Footer => {
243                    let (_, undrained) = self.drain_buffer(output);
244                    if undrained == 0 {
245                        self.state = State::Done;
246                        true
247                    } else {
248                        false
249                    }
250                }
251
252                State::Done => true,
253            };
254
255            if done {
256                return Ok(true);
257            }
258
259            if output.has_no_spare_space() {
260                return Ok(false);
261            }
262        }
263    }
264
265    fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
266        loop {
267            match self.state {
268                State::Header => {
269                    self.write(Lz4Fn::Begin, output)?;
270                }
271
272                State::Encoding => {
273                    self.write(Lz4Fn::End, output)?;
274                }
275
276                State::Footer => {
277                    let (_, undrained) = self.drain_buffer(output);
278                    if undrained == 0 {
279                        self.state = State::Done;
280                    }
281                }
282
283                State::Done => {}
284            }
285
286            if let State::Done = self.state {
287                return Ok(true);
288            }
289
290            if output.has_no_spare_space() {
291                return Ok(false);
292            }
293        }
294    }
295}