1use crate::{
2 ltx::{HeaderDecodeError, PageHeader, PageHeaderDecodeError, TrailerDecodeError, CRC64},
3 Checksum, Header, HeaderFlags, PageNum, PageSize, Trailer,
4};
5use lz4_flex::frame::FrameDecoder;
6use std::io::{self, Read};
7
8#[derive(thiserror::Error, Debug)]
10pub enum Error {
11 #[error("header")]
12 Header(#[from] HeaderDecodeError),
13 #[error("page header")]
14 PageHeader(#[from] PageHeaderDecodeError),
15 #[error("trailer")]
16 Trailer(#[from] TrailerDecodeError),
17 #[error("invalid page buffer size: {0}, expected {1}")]
18 InvalidBufferSize(usize, PageSize),
19 #[error("file checksum mismatch")]
20 FileChecksumMismatch,
21 #[error("read")]
22 Read(#[from] io::Error),
23}
24
25pub struct Decoder<'a, R>
43where
44 R: io::Read,
45{
46 r: LTXReader<R>,
47 digest: crc::Digest<'a, u64>,
48 page_size: PageSize,
49 pages_done: bool,
50}
51
52impl<'a, R> Decoder<'a, R>
53where
54 R: io::Read,
55{
56 pub fn new(mut r: R) -> Result<(Decoder<'a, R>, Header), Error> {
58 let mut digest = CRC64.digest();
59 let hdr = {
60 let reader = CrcDigestRead::new(&mut r, &mut digest);
61 Header::decode_from(reader)?
62 };
63
64 Ok((
65 Decoder {
66 r: LTXReader::new(r, hdr.flags.contains(HeaderFlags::COMPRESS_LZ4)),
67 digest,
68 page_size: hdr.page_size,
69 pages_done: false,
70 },
71 hdr,
72 ))
73 }
74
75 pub fn decode_page(&mut self, data: &mut [u8]) -> Result<Option<PageNum>, Error> {
80 if self.pages_done {
81 return Ok(None);
82 };
83
84 if data.len() != self.page_size.into_inner() as usize {
85 return Err(Error::InvalidBufferSize(data.len(), self.page_size));
86 }
87
88 let mut reader = CrcDigestRead::new(&mut self.r, &mut self.digest);
89 let header = PageHeader::decode_from(&mut reader)?;
90 if header.0.is_none() {
91 self.pages_done = true;
92 return Ok(None);
93 };
94
95 reader.read_exact(data)?;
96
97 Ok(header.0)
98 }
99
100 pub fn finish(mut self) -> Result<Trailer, Error> {
102 let reader = self.r.finish()?;
103 let trailer = Trailer::decode_from(reader)?;
104
105 self.digest
106 .update(&trailer.post_apply_checksum.into_inner().to_be_bytes());
107
108 if Checksum::new(self.digest.finalize()) != trailer.file_checksum {
109 return Err(Error::FileChecksumMismatch);
110 }
111
112 Ok(trailer)
113 }
114}
115
116struct LTXReader<R>
117where
118 R: io::Read,
119{
120 dec: FrameDecoder<R>,
121 compressed: bool,
122}
123
124impl<R> LTXReader<R>
125where
126 R: io::Read,
127{
128 fn new(r: R, compressed: bool) -> LTXReader<R> {
129 LTXReader {
130 dec: FrameDecoder::new(r),
131 compressed,
132 }
133 }
134
135 fn finish(mut self) -> io::Result<R> {
136 if self.compressed {
138 let mut buf = [0; 1];
139 match self.dec.read_exact(&mut buf) {
140 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => (),
141 Err(e) => return Err(e),
142 _ => {
143 return Err(io::Error::new(
144 io::ErrorKind::Other,
145 "expected lz4 end frame",
146 ))
147 }
148 }
149 }
150
151 Ok(self.dec.into_inner())
152 }
153}
154
155impl<R> io::Read for LTXReader<R>
156where
157 R: io::Read,
158{
159 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
160 if self.compressed {
161 self.dec.read(buf)
162 } else {
163 self.dec.get_mut().read(buf)
164 }
165 }
166}
167
168struct CrcDigestRead<'a, 'b, R>
170where
171 R: io::Read,
172{
173 inner: R,
174 digest: &'a mut crc::Digest<'b, u64>,
175}
176
177impl<'a, 'b, R> CrcDigestRead<'a, 'b, R>
178where
179 R: io::Read,
180{
181 fn new(inner: R, digest: &'a mut crc::Digest<'b, u64>) -> Self {
182 CrcDigestRead { inner, digest }
183 }
184}
185
186impl<'a, 'b, R> io::Read for CrcDigestRead<'a, 'b, R>
187where
188 R: io::Read,
189{
190 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
191 let read = self.inner.read(buf)?;
192 self.digest.update(&buf[..read]);
193 Ok(read)
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::{CrcDigestRead, Decoder};
200 use crate::{
201 ltx::CRC64, utils::TimeRound, Checksum, Encoder, Header, HeaderFlags, PageNum, PageSize,
202 TXID,
203 };
204 use std::{io::Read, time};
205
206 #[test]
207 fn crc_digest_read() {
208 let buf_in = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
209 let mut digest = CRC64.digest();
210 let mut reader = CrcDigestRead::new(buf_in.as_slice(), &mut digest);
211
212 let mut buf_out = vec![0; 10];
213 assert!(matches!(reader.read(&mut buf_out), Ok(10)));
214 assert_eq!(buf_in, buf_out);
215 assert_eq!(6672316476627126589, digest.finalize());
216 }
217
218 fn decoder_test(flags: HeaderFlags) {
219 let mut buf = Vec::new();
220
221 let header = Header {
222 flags,
223 page_size: PageSize::new(4096).unwrap(),
224 commit: PageNum::new(3).unwrap(),
225 min_txid: TXID::new(5).unwrap(),
226 max_txid: TXID::new(6).unwrap(),
227 timestamp: time::SystemTime::now()
228 .round(time::Duration::from_millis(1))
229 .unwrap(),
230 pre_apply_checksum: Some(Checksum::new(5)),
231 };
232
233 let mut enc = Encoder::new(&mut buf, &header).expect("failed to create encoder");
234 let mut pages: Vec<(PageNum, Vec<_>)> = Vec::new();
235 pages.push((
236 PageNum::new(4).unwrap(),
237 (0..4096).map(|_| rand::random::<u8>()).collect::<Vec<_>>(),
238 ));
239 pages.push((
240 PageNum::new(6).unwrap(),
241 (0..4096).map(|_| rand::random::<u8>()).collect::<Vec<_>>(),
242 ));
243
244 for (page_num, page) in &pages {
245 enc.encode_page(*page_num, page.as_slice())
246 .expect("failed to encode page");
247 }
248
249 let trailer = enc
250 .finish(Checksum::new(6))
251 .expect("failed to finish encoder");
252
253 let (mut dec, header_out) = Decoder::new(buf.as_slice()).expect("failed to create decoder");
254 assert_eq!(header, header_out);
255
256 let mut page_out = vec![0; 4096];
257 for (page_num, page) in pages {
258 assert!(matches!(
259 dec.decode_page(page_out.as_mut_slice()),
260 Ok(Some(num)) if num == page_num
261 ));
262 assert_eq!(page, page_out);
263 }
264
265 assert!(matches!(dec.decode_page(page_out.as_mut_slice()), Ok(None)));
266
267 let trailer_out = dec.finish().expect("failed to finish decoder");
268 assert_eq!(trailer, trailer_out);
269 }
270
271 #[test]
272 fn decoder() {
273 decoder_test(HeaderFlags::empty());
274 }
275
276 #[test]
277 fn decoder_compressed() {
278 decoder_test(HeaderFlags::COMPRESS_LZ4);
279 }
280}