litetx/
ltx.rs

1use crate::types::{Checksum, PageNum, PageNumError, PageSize, PageSizeError, TXIDError, TXID};
2use std::{io, time};
3
4pub(crate) const CRC64: crc::Crc<u64> = crc::Crc::<u64>::new(&crc::CRC_64_GO_ISO);
5
6bitflags::bitflags! {
7    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
8    pub struct HeaderFlags: u32 {
9        const COMPRESS_LZ4 = 0b00000001;
10    }
11}
12
13/// A header validation error.
14#[derive(thiserror::Error, Debug)]
15pub enum HeaderValidateError {
16    #[error("transaction ids out of order: ({0}, {1})")]
17    TXIDOrder(TXID, TXID),
18    #[error("pre-apply checksum must be unset on snapshots")]
19    PreApplyChecksumOnSnapshot,
20    #[error("pre-apply checksum required on non-snapshot files")]
21    NoPreApplyChecksum,
22}
23
24/// A header encoding error.
25#[derive(thiserror::Error, Debug)]
26pub enum HeaderEncodeError {
27    #[error("validation failed")]
28    Validation(#[from] HeaderValidateError),
29    #[error("invalid timestamp: {0}")]
30    Timestamp(time::SystemTimeError),
31    #[error("write error")]
32    Write(#[from] io::Error),
33}
34
35/// A header decoding error.
36#[derive(thiserror::Error, Debug)]
37pub enum HeaderDecodeError {
38    #[error("read error")]
39    Read(#[from] io::Error),
40    #[error("invalid magic record: {0:?}")]
41    Magic([u8; 4]),
42    #[error("invalid flags record: {0:x}")]
43    Flags(u32),
44    #[error("invalid page size record")]
45    PageSize(#[from] PageSizeError),
46    #[error("invalid commit record: {0}")]
47    Commit(PageNumError),
48    #[error("invalid min TX ID record: {0}")]
49    MinTXID(TXIDError),
50    #[error("invalid max TX ID record: {0}")]
51    MaxTXID(TXIDError),
52    #[error("invalid timestamp: {0}")]
53    Timestamp(u64),
54    #[error("validation failed")]
55    Validation(#[from] HeaderValidateError),
56}
57
58pub(crate) const HEADER_SIZE: usize = 100;
59pub(crate) const TRAILER_SIZE: usize = 16;
60pub(crate) const PAGE_HEADER_SIZE: usize = 4;
61
62/// An LTX file header.
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct Header {
65    /// Flags changing the behavior of LTX encoder/decoder.
66    pub flags: HeaderFlags,
67    /// The size of the database pages encoded in the file.
68    pub page_size: PageSize,
69    /// The size of the database in pages.
70    pub commit: PageNum,
71    /// Minimum transaction ID in the file.
72    pub min_txid: TXID,
73    /// Maximum transaction ID in the file. May be equal to `min_txid` if the file
74    /// contains only one transaction.
75    pub max_txid: TXID,
76    /// The time when the LTX file was created.
77    pub timestamp: time::SystemTime,
78    /// Running database checksum before this LTX file is applied. `None` if the LTX
79    /// file contains the full snapshot of a database.
80    pub pre_apply_checksum: Option<Checksum>,
81}
82
83impl Header {
84    const MAGIC: &'static str = "LTX1";
85
86    pub(crate) fn is_snapshot(&self) -> bool {
87        self.min_txid == TXID::ONE
88    }
89
90    fn validate(&self) -> Result<(), HeaderValidateError> {
91        if self.min_txid > self.max_txid {
92            return Err(HeaderValidateError::TXIDOrder(self.min_txid, self.max_txid));
93        };
94
95        if self.is_snapshot() && self.pre_apply_checksum.is_some() {
96            return Err(HeaderValidateError::PreApplyChecksumOnSnapshot);
97        }
98
99        if !self.is_snapshot() && self.pre_apply_checksum.is_none() {
100            return Err(HeaderValidateError::NoPreApplyChecksum);
101        }
102
103        Ok(())
104    }
105
106    pub(crate) fn encode_into<W>(&self, mut w: W) -> Result<(), HeaderEncodeError>
107    where
108        W: io::Write,
109    {
110        let mut buf = Vec::with_capacity(HEADER_SIZE);
111        let timestamp = self
112            .timestamp
113            .duration_since(time::SystemTime::UNIX_EPOCH)
114            .map_err(HeaderEncodeError::Timestamp)?
115            .as_millis() as u64;
116        let checksum = if let Some(c) = self.pre_apply_checksum {
117            c.into_inner()
118        } else {
119            0
120        };
121
122        self.validate()?;
123
124        buf.extend_from_slice(Self::MAGIC.as_bytes());
125        buf.extend_from_slice(&self.flags.bits().to_be_bytes());
126        buf.extend_from_slice(&self.page_size.into_inner().to_be_bytes());
127        buf.extend_from_slice(&self.commit.into_inner().to_be_bytes());
128        buf.extend_from_slice(&self.min_txid.into_inner().to_be_bytes());
129        buf.extend_from_slice(&self.max_txid.into_inner().to_be_bytes());
130        buf.extend_from_slice(&timestamp.to_be_bytes());
131        buf.extend_from_slice(&checksum.to_be_bytes());
132        buf.resize(HEADER_SIZE, 0);
133
134        w.write_all(&buf)?;
135
136        Ok(())
137    }
138
139    pub(crate) fn decode_from<R>(mut r: R) -> Result<Header, HeaderDecodeError>
140    where
141        R: io::Read,
142    {
143        let mut buf = vec![0; HEADER_SIZE];
144        r.read_exact(&mut buf)?;
145
146        if &buf[0..4] != Self::MAGIC.as_bytes() {
147            return Err(HeaderDecodeError::Magic(buf[0..4].try_into().unwrap()));
148        }
149
150        let flags = u32::from_be_bytes(buf[4..8].try_into().unwrap());
151        let flags = HeaderFlags::from_bits(flags).ok_or(HeaderDecodeError::Flags(flags))?;
152
153        let page_size = u32::from_be_bytes(buf[8..12].try_into().unwrap());
154        let page_size = PageSize::new(page_size)?;
155
156        let commit = u32::from_be_bytes(buf[12..16].try_into().unwrap());
157        let commit = PageNum::new(commit).map_err(HeaderDecodeError::Commit)?;
158
159        let min_txid = u64::from_be_bytes(buf[16..24].try_into().unwrap());
160        let min_txid = TXID::new(min_txid).map_err(HeaderDecodeError::MinTXID)?;
161
162        let max_txid = u64::from_be_bytes(buf[24..32].try_into().unwrap());
163        let max_txid = TXID::new(max_txid).map_err(HeaderDecodeError::MaxTXID)?;
164
165        let timestamp = u64::from_be_bytes(buf[32..40].try_into().unwrap());
166        let timestamp = time::SystemTime::UNIX_EPOCH
167            .checked_add(time::Duration::from_millis(timestamp))
168            .ok_or(HeaderDecodeError::Timestamp(timestamp))?;
169
170        let pre_apply_checksum = u64::from_be_bytes(buf[40..48].try_into().unwrap());
171        let pre_apply_checksum = if pre_apply_checksum != 0 {
172            Some(Checksum::new(pre_apply_checksum))
173        } else {
174            None
175        };
176
177        let hdr = Header {
178            flags,
179            page_size,
180            commit,
181            min_txid,
182            max_txid,
183            timestamp,
184            pre_apply_checksum,
185        };
186
187        hdr.validate()?;
188
189        Ok(hdr)
190    }
191}
192
193/// A trailer encoding error.
194#[derive(thiserror::Error, Debug)]
195pub enum TrailerEncodeError {
196    #[error("write error")]
197    Write(#[from] io::Error),
198}
199
200/// A trailer decoding error.
201#[derive(thiserror::Error, Debug)]
202pub enum TrailerDecodeError {
203    #[error("read error")]
204    Read(#[from] io::Error),
205    #[error("invalid post apply checksum: {0}")]
206    PostApplyChecksum(u64),
207    #[error("invalid file checksum: {0}")]
208    FileChecksum(u64),
209}
210
211/// An LTX file trailer.
212#[derive(Debug, PartialEq, Eq)]
213pub struct Trailer {
214    /// Running database checksum after this LTX file has been applied.
215    pub post_apply_checksum: Checksum,
216    /// LTX file checksum.
217    pub file_checksum: Checksum,
218}
219
220impl Trailer {
221    pub(crate) fn encode_into<W>(&self, mut w: W) -> Result<(), TrailerEncodeError>
222    where
223        W: io::Write,
224    {
225        let mut buf = Vec::with_capacity(TRAILER_SIZE);
226
227        buf.extend_from_slice(&self.post_apply_checksum.into_inner().to_be_bytes());
228        buf.extend_from_slice(&self.file_checksum.into_inner().to_be_bytes());
229
230        w.write_all(&buf)?;
231
232        Ok(())
233    }
234
235    pub(crate) fn decode_from<R>(mut r: R) -> Result<Trailer, TrailerDecodeError>
236    where
237        R: io::Read,
238    {
239        let mut buf = [0; TRAILER_SIZE];
240        r.read_exact(&mut buf)?;
241
242        let post_apply_checksum = u64::from_be_bytes(buf[0..8].try_into().unwrap());
243        let file_checksum = u64::from_be_bytes(buf[8..16].try_into().unwrap());
244
245        let trailer = Trailer {
246            post_apply_checksum: Checksum::new(post_apply_checksum),
247            file_checksum: Checksum::new(file_checksum),
248        };
249        if trailer.post_apply_checksum.into_inner() != post_apply_checksum {
250            return Err(TrailerDecodeError::PostApplyChecksum(post_apply_checksum));
251        }
252        if trailer.file_checksum.into_inner() != file_checksum {
253            return Err(TrailerDecodeError::FileChecksum(file_checksum));
254        }
255
256        Ok(trailer)
257    }
258}
259
260/// A page header encoding error.
261#[derive(thiserror::Error, Debug)]
262pub enum PageHeaderEncodeError {
263    #[error("write error")]
264    Write(#[from] io::Error),
265}
266
267/// A page header decoding error.
268#[derive(thiserror::Error, Debug)]
269pub enum PageHeaderDecodeError {
270    #[error("read error")]
271    Read(#[from] io::Error),
272    #[error("invalid page number record: {0}")]
273    PageNum(PageNumError),
274}
275
276#[derive(Debug, PartialEq, Eq)]
277pub(crate) struct PageHeader(pub(crate) Option<PageNum>);
278
279impl PageHeader {
280    pub(crate) fn encode_into<W>(&self, mut w: W) -> Result<(), PageHeaderEncodeError>
281    where
282        W: io::Write,
283    {
284        let page_num = self.0.map(|n| n.into_inner()).unwrap_or(0);
285        w.write_all(&page_num.to_be_bytes())?;
286
287        Ok(())
288    }
289
290    pub(crate) fn decode_from<R>(mut r: R) -> Result<PageHeader, PageHeaderDecodeError>
291    where
292        R: io::Read,
293    {
294        let mut buf = [0; PAGE_HEADER_SIZE];
295        r.read_exact(&mut buf)?;
296
297        let page_num = u32::from_be_bytes(buf[0..4].try_into().unwrap());
298        let page_num = if page_num != 0 {
299            Some(PageNum::new(page_num).map_err(PageHeaderDecodeError::PageNum)?)
300        } else {
301            None
302        };
303
304        Ok(PageHeader(page_num))
305    }
306}
307
308/// A trait for page checksum calculation.
309pub trait PageChecksum {
310    /// Calculate database page checksum for the given page number.
311    fn page_checksum(&self, pgno: PageNum) -> Checksum;
312}
313
314impl<T> PageChecksum for T
315where
316    T: AsRef<[u8]>,
317{
318    fn page_checksum(&self, pgno: PageNum) -> Checksum {
319        let mut digest = CRC64.digest();
320
321        digest.update(&pgno.into_inner().to_be_bytes());
322        digest.update(self.as_ref());
323
324        Checksum::new(digest.finalize())
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::{Header, HeaderFlags, HeaderValidateError, PageHeader, Trailer};
331    use crate::{utils::TimeRound, Checksum, PageNum, PageSize, TXID};
332    use std::time;
333
334    fn encode_decode_header(mut hdr: Header) {
335        let mut buf = Vec::new();
336
337        // round timestamp to milliseconds to be able to compare it later.
338        hdr.timestamp = hdr.timestamp.round(time::Duration::from_millis(1)).unwrap();
339
340        hdr.encode_into(&mut buf).expect("failed to encode header");
341        let hdr_out = Header::decode_from(buf.as_slice()).expect("failed to decode header");
342
343        assert_eq!(hdr_out, hdr);
344    }
345
346    #[test]
347    fn snapshot_header() {
348        encode_decode_header(Header {
349            flags: HeaderFlags::COMPRESS_LZ4,
350            page_size: PageSize::new(4096).unwrap(),
351            commit: PageNum::new(10).unwrap(),
352            min_txid: TXID::new(1).unwrap(),
353            max_txid: TXID::new(5).unwrap(),
354            timestamp: time::SystemTime::now(),
355            pre_apply_checksum: None,
356        });
357    }
358
359    #[test]
360    fn non_snapshot_header() {
361        encode_decode_header(Header {
362            flags: HeaderFlags::COMPRESS_LZ4,
363            page_size: PageSize::new(4096).unwrap(),
364            commit: PageNum::new(10).unwrap(),
365            min_txid: TXID::new(3).unwrap(),
366            max_txid: TXID::new(5).unwrap(),
367            timestamp: time::SystemTime::now(),
368            pre_apply_checksum: Some(Checksum::new(123)),
369        });
370    }
371
372    #[test]
373    fn validate_header() {
374        let hdr = Header {
375            flags: HeaderFlags::COMPRESS_LZ4,
376            page_size: PageSize::new(4096).unwrap(),
377            commit: PageNum::new(10).unwrap(),
378            min_txid: TXID::new(5).unwrap(),
379            max_txid: TXID::new(3).unwrap(),
380            timestamp: time::SystemTime::now(),
381            pre_apply_checksum: Some(Checksum::new(123)),
382        };
383        assert!(matches!(
384            hdr.validate(),
385            Err(HeaderValidateError::TXIDOrder(min, max)) if min == hdr.min_txid && max == hdr.max_txid));
386
387        let hdr = Header {
388            flags: HeaderFlags::COMPRESS_LZ4,
389            page_size: PageSize::new(4096).unwrap(),
390            commit: PageNum::new(10).unwrap(),
391            min_txid: TXID::new(1).unwrap(),
392            max_txid: TXID::new(3).unwrap(),
393            timestamp: time::SystemTime::now(),
394            pre_apply_checksum: Some(Checksum::new(123)),
395        };
396        assert!(matches!(
397            hdr.validate(),
398            Err(HeaderValidateError::PreApplyChecksumOnSnapshot)
399        ));
400
401        let hdr = Header {
402            flags: HeaderFlags::COMPRESS_LZ4,
403            page_size: PageSize::new(4096).unwrap(),
404            commit: PageNum::new(10).unwrap(),
405            min_txid: TXID::new(3).unwrap(),
406            max_txid: TXID::new(5).unwrap(),
407            timestamp: time::SystemTime::now(),
408            pre_apply_checksum: None,
409        };
410        assert!(matches!(
411            hdr.validate(),
412            Err(HeaderValidateError::NoPreApplyChecksum)
413        ));
414    }
415
416    #[test]
417    fn trailer() {
418        let mut buf = Vec::new();
419
420        let trailer = Trailer {
421            post_apply_checksum: Checksum::new(123),
422            file_checksum: Checksum::new(123),
423        };
424        trailer
425            .encode_into(&mut buf)
426            .expect("failed to encode trailer");
427        let trailer_out = Trailer::decode_from(buf.as_slice()).expect("failed to decode trailer");
428
429        assert_eq!(trailer_out, trailer);
430    }
431
432    #[test]
433    fn page_header() {
434        let mut buf = Vec::new();
435
436        let page_header = PageHeader(Some(PageNum::new(10).unwrap()));
437        page_header
438            .encode_into(&mut buf)
439            .expect("failed to encode page header");
440        let page_header_out =
441            PageHeader::decode_from(buf.as_slice()).expect("failed to decode page header");
442
443        assert_eq!(page_header_out, page_header);
444    }
445
446    #[test]
447    fn empty_page_header() {
448        let mut buf = Vec::new();
449
450        let page_header = PageHeader(None);
451        page_header
452            .encode_into(&mut buf)
453            .expect("failed to encode page header");
454        let page_header_out =
455            PageHeader::decode_from(buf.as_slice()).expect("failed to decode page header");
456
457        assert_eq!(page_header_out, page_header);
458    }
459}