litetx/
decoder.rs

1use crate::{
2    ltx::{HeaderDecodeError, PageHeader, PageHeaderDecodeError, TrailerDecodeError, CRC64},
3    Checksum, Header, HeaderFlags, PageNum, PageSize, Trailer,
4};
5use lz4_flex::frame::FrameDecoder;
6use std::io::{self, Read};
7
8/// An error that can be returned by [`Decoder`].
9#[derive(thiserror::Error, Debug)]
10pub enum Error {
11    #[error("header")]
12    Header(#[from] HeaderDecodeError),
13    #[error("page header")]
14    PageHeader(#[from] PageHeaderDecodeError),
15    #[error("trailer")]
16    Trailer(#[from] TrailerDecodeError),
17    #[error("invalid page buffer size: {0}, expected {1}")]
18    InvalidBufferSize(usize, PageSize),
19    #[error("file checksum mismatch")]
20    FileChecksumMismatch,
21    #[error("read")]
22    Read(#[from] io::Error),
23}
24
25/// An LTX file decoder.
26///
27/// # Example
28/// ```no_run
29/// # use std::time::SystemTime;
30/// # let v = Vec::new();
31/// # let mut r = &mut &v[..];
32/// #
33/// let (mut dec, header) = litetx::Decoder::new(r).expect("decoder");
34///
35/// let mut buf = vec![0; header.page_size.into_inner() as usize];
36/// while let Some(page_num) = dec.decode_page(&mut buf).expect("decode_page") {
37///     // do something with the page
38/// }
39///
40/// let trailer = dec.finish().expect("finish");
41/// ```
42pub struct Decoder<'a, R>
43where
44    R: io::Read,
45{
46    r: LTXReader<R>,
47    digest: crc::Digest<'a, u64>,
48    page_size: PageSize,
49    pages_done: bool,
50}
51
52impl<'a, R> Decoder<'a, R>
53where
54    R: io::Read,
55{
56    /// Construct a new [`Decoder`] that reads from `r`.
57    pub fn new(mut r: R) -> Result<(Decoder<'a, R>, Header), Error> {
58        let mut digest = CRC64.digest();
59        let hdr = {
60            let reader = CrcDigestRead::new(&mut r, &mut digest);
61            Header::decode_from(reader)?
62        };
63
64        Ok((
65            Decoder {
66                r: LTXReader::new(r, hdr.flags.contains(HeaderFlags::COMPRESS_LZ4)),
67                digest,
68                page_size: hdr.page_size,
69                pages_done: false,
70            },
71            hdr,
72        ))
73    }
74
75    /// Decode the next page from the LTX file.
76    ///
77    /// Returns `Ok(Some(page_num))` if a page has been successfully decoded.
78    /// Return `Ok(None)` if the LTX file doesn't have any more pages.
79    pub fn decode_page(&mut self, data: &mut [u8]) -> Result<Option<PageNum>, Error> {
80        if self.pages_done {
81            return Ok(None);
82        };
83
84        if data.len() != self.page_size.into_inner() as usize {
85            return Err(Error::InvalidBufferSize(data.len(), self.page_size));
86        }
87
88        let mut reader = CrcDigestRead::new(&mut self.r, &mut self.digest);
89        let header = PageHeader::decode_from(&mut reader)?;
90        if header.0.is_none() {
91            self.pages_done = true;
92            return Ok(None);
93        };
94
95        reader.read_exact(data)?;
96
97        Ok(header.0)
98    }
99
100    /// Consume the decoder and verify file checksum.
101    pub fn finish(mut self) -> Result<Trailer, Error> {
102        let reader = self.r.finish()?;
103        let trailer = Trailer::decode_from(reader)?;
104
105        self.digest
106            .update(&trailer.post_apply_checksum.into_inner().to_be_bytes());
107
108        if Checksum::new(self.digest.finalize()) != trailer.file_checksum {
109            return Err(Error::FileChecksumMismatch);
110        }
111
112        Ok(trailer)
113    }
114}
115
116struct LTXReader<R>
117where
118    R: io::Read,
119{
120    dec: FrameDecoder<R>,
121    compressed: bool,
122}
123
124impl<R> LTXReader<R>
125where
126    R: io::Read,
127{
128    fn new(r: R, compressed: bool) -> LTXReader<R> {
129        LTXReader {
130            dec: FrameDecoder::new(r),
131            compressed,
132        }
133    }
134
135    fn finish(mut self) -> io::Result<R> {
136        // Read lz4 trailer frame.
137        if self.compressed {
138            let mut buf = [0; 1];
139            match self.dec.read_exact(&mut buf) {
140                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => (),
141                Err(e) => return Err(e),
142                _ => {
143                    return Err(io::Error::new(
144                        io::ErrorKind::Other,
145                        "expected lz4 end frame",
146                    ))
147                }
148            }
149        }
150
151        Ok(self.dec.into_inner())
152    }
153}
154
155impl<R> io::Read for LTXReader<R>
156where
157    R: io::Read,
158{
159    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
160        if self.compressed {
161            self.dec.read(buf)
162        } else {
163            self.dec.get_mut().read(buf)
164        }
165    }
166}
167
168/// An [`io::Read`] computing a digest on the bytes read.
169struct CrcDigestRead<'a, 'b, R>
170where
171    R: io::Read,
172{
173    inner: R,
174    digest: &'a mut crc::Digest<'b, u64>,
175}
176
177impl<'a, 'b, R> CrcDigestRead<'a, 'b, R>
178where
179    R: io::Read,
180{
181    fn new(inner: R, digest: &'a mut crc::Digest<'b, u64>) -> Self {
182        CrcDigestRead { inner, digest }
183    }
184}
185
186impl<'a, 'b, R> io::Read for CrcDigestRead<'a, 'b, R>
187where
188    R: io::Read,
189{
190    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
191        let read = self.inner.read(buf)?;
192        self.digest.update(&buf[..read]);
193        Ok(read)
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::{CrcDigestRead, Decoder};
200    use crate::{
201        ltx::CRC64, utils::TimeRound, Checksum, Encoder, Header, HeaderFlags, PageNum, PageSize,
202        TXID,
203    };
204    use std::{io::Read, time};
205
206    #[test]
207    fn crc_digest_read() {
208        let buf_in = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
209        let mut digest = CRC64.digest();
210        let mut reader = CrcDigestRead::new(buf_in.as_slice(), &mut digest);
211
212        let mut buf_out = vec![0; 10];
213        assert!(matches!(reader.read(&mut buf_out), Ok(10)));
214        assert_eq!(buf_in, buf_out);
215        assert_eq!(6672316476627126589, digest.finalize());
216    }
217
218    fn decoder_test(flags: HeaderFlags) {
219        let mut buf = Vec::new();
220
221        let header = Header {
222            flags,
223            page_size: PageSize::new(4096).unwrap(),
224            commit: PageNum::new(3).unwrap(),
225            min_txid: TXID::new(5).unwrap(),
226            max_txid: TXID::new(6).unwrap(),
227            timestamp: time::SystemTime::now()
228                .round(time::Duration::from_millis(1))
229                .unwrap(),
230            pre_apply_checksum: Some(Checksum::new(5)),
231        };
232
233        let mut enc = Encoder::new(&mut buf, &header).expect("failed to create encoder");
234        let mut pages: Vec<(PageNum, Vec<_>)> = Vec::new();
235        pages.push((
236            PageNum::new(4).unwrap(),
237            (0..4096).map(|_| rand::random::<u8>()).collect::<Vec<_>>(),
238        ));
239        pages.push((
240            PageNum::new(6).unwrap(),
241            (0..4096).map(|_| rand::random::<u8>()).collect::<Vec<_>>(),
242        ));
243
244        for (page_num, page) in &pages {
245            enc.encode_page(*page_num, page.as_slice())
246                .expect("failed to encode page");
247        }
248
249        let trailer = enc
250            .finish(Checksum::new(6))
251            .expect("failed to finish encoder");
252
253        let (mut dec, header_out) = Decoder::new(buf.as_slice()).expect("failed to create decoder");
254        assert_eq!(header, header_out);
255
256        let mut page_out = vec![0; 4096];
257        for (page_num, page) in pages {
258            assert!(matches!(
259                dec.decode_page(page_out.as_mut_slice()),
260                Ok(Some(num)) if num == page_num
261            ));
262            assert_eq!(page, page_out);
263        }
264
265        assert!(matches!(dec.decode_page(page_out.as_mut_slice()), Ok(None)));
266
267        let trailer_out = dec.finish().expect("failed to finish decoder");
268        assert_eq!(trailer, trailer_out);
269    }
270
271    #[test]
272    fn decoder() {
273        decoder_test(HeaderFlags::empty());
274    }
275
276    #[test]
277    fn decoder_compressed() {
278        decoder_test(HeaderFlags::COMPRESS_LZ4);
279    }
280}