1use crate::types::{Checksum, PageNum, PageNumError, PageSize, PageSizeError, TXIDError, TXID};
2use std::{io, time};
3
4pub(crate) const CRC64: crc::Crc<u64> = crc::Crc::<u64>::new(&crc::CRC_64_GO_ISO);
5
6bitflags::bitflags! {
7 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
8 pub struct HeaderFlags: u32 {
9 const COMPRESS_LZ4 = 0b00000001;
10 }
11}
12
13#[derive(thiserror::Error, Debug)]
15pub enum HeaderValidateError {
16 #[error("transaction ids out of order: ({0}, {1})")]
17 TXIDOrder(TXID, TXID),
18 #[error("pre-apply checksum must be unset on snapshots")]
19 PreApplyChecksumOnSnapshot,
20 #[error("pre-apply checksum required on non-snapshot files")]
21 NoPreApplyChecksum,
22}
23
24#[derive(thiserror::Error, Debug)]
26pub enum HeaderEncodeError {
27 #[error("validation failed")]
28 Validation(#[from] HeaderValidateError),
29 #[error("invalid timestamp: {0}")]
30 Timestamp(time::SystemTimeError),
31 #[error("write error")]
32 Write(#[from] io::Error),
33}
34
35#[derive(thiserror::Error, Debug)]
37pub enum HeaderDecodeError {
38 #[error("read error")]
39 Read(#[from] io::Error),
40 #[error("invalid magic record: {0:?}")]
41 Magic([u8; 4]),
42 #[error("invalid flags record: {0:x}")]
43 Flags(u32),
44 #[error("invalid page size record")]
45 PageSize(#[from] PageSizeError),
46 #[error("invalid commit record: {0}")]
47 Commit(PageNumError),
48 #[error("invalid min TX ID record: {0}")]
49 MinTXID(TXIDError),
50 #[error("invalid max TX ID record: {0}")]
51 MaxTXID(TXIDError),
52 #[error("invalid timestamp: {0}")]
53 Timestamp(u64),
54 #[error("validation failed")]
55 Validation(#[from] HeaderValidateError),
56}
57
58pub(crate) const HEADER_SIZE: usize = 100;
59pub(crate) const TRAILER_SIZE: usize = 16;
60pub(crate) const PAGE_HEADER_SIZE: usize = 4;
61
62#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct Header {
65 pub flags: HeaderFlags,
67 pub page_size: PageSize,
69 pub commit: PageNum,
71 pub min_txid: TXID,
73 pub max_txid: TXID,
76 pub timestamp: time::SystemTime,
78 pub pre_apply_checksum: Option<Checksum>,
81}
82
83impl Header {
84 const MAGIC: &'static str = "LTX1";
85
86 pub(crate) fn is_snapshot(&self) -> bool {
87 self.min_txid == TXID::ONE
88 }
89
90 fn validate(&self) -> Result<(), HeaderValidateError> {
91 if self.min_txid > self.max_txid {
92 return Err(HeaderValidateError::TXIDOrder(self.min_txid, self.max_txid));
93 };
94
95 if self.is_snapshot() && self.pre_apply_checksum.is_some() {
96 return Err(HeaderValidateError::PreApplyChecksumOnSnapshot);
97 }
98
99 if !self.is_snapshot() && self.pre_apply_checksum.is_none() {
100 return Err(HeaderValidateError::NoPreApplyChecksum);
101 }
102
103 Ok(())
104 }
105
106 pub(crate) fn encode_into<W>(&self, mut w: W) -> Result<(), HeaderEncodeError>
107 where
108 W: io::Write,
109 {
110 let mut buf = Vec::with_capacity(HEADER_SIZE);
111 let timestamp = self
112 .timestamp
113 .duration_since(time::SystemTime::UNIX_EPOCH)
114 .map_err(HeaderEncodeError::Timestamp)?
115 .as_millis() as u64;
116 let checksum = if let Some(c) = self.pre_apply_checksum {
117 c.into_inner()
118 } else {
119 0
120 };
121
122 self.validate()?;
123
124 buf.extend_from_slice(Self::MAGIC.as_bytes());
125 buf.extend_from_slice(&self.flags.bits().to_be_bytes());
126 buf.extend_from_slice(&self.page_size.into_inner().to_be_bytes());
127 buf.extend_from_slice(&self.commit.into_inner().to_be_bytes());
128 buf.extend_from_slice(&self.min_txid.into_inner().to_be_bytes());
129 buf.extend_from_slice(&self.max_txid.into_inner().to_be_bytes());
130 buf.extend_from_slice(×tamp.to_be_bytes());
131 buf.extend_from_slice(&checksum.to_be_bytes());
132 buf.resize(HEADER_SIZE, 0);
133
134 w.write_all(&buf)?;
135
136 Ok(())
137 }
138
139 pub(crate) fn decode_from<R>(mut r: R) -> Result<Header, HeaderDecodeError>
140 where
141 R: io::Read,
142 {
143 let mut buf = vec![0; HEADER_SIZE];
144 r.read_exact(&mut buf)?;
145
146 if &buf[0..4] != Self::MAGIC.as_bytes() {
147 return Err(HeaderDecodeError::Magic(buf[0..4].try_into().unwrap()));
148 }
149
150 let flags = u32::from_be_bytes(buf[4..8].try_into().unwrap());
151 let flags = HeaderFlags::from_bits(flags).ok_or(HeaderDecodeError::Flags(flags))?;
152
153 let page_size = u32::from_be_bytes(buf[8..12].try_into().unwrap());
154 let page_size = PageSize::new(page_size)?;
155
156 let commit = u32::from_be_bytes(buf[12..16].try_into().unwrap());
157 let commit = PageNum::new(commit).map_err(HeaderDecodeError::Commit)?;
158
159 let min_txid = u64::from_be_bytes(buf[16..24].try_into().unwrap());
160 let min_txid = TXID::new(min_txid).map_err(HeaderDecodeError::MinTXID)?;
161
162 let max_txid = u64::from_be_bytes(buf[24..32].try_into().unwrap());
163 let max_txid = TXID::new(max_txid).map_err(HeaderDecodeError::MaxTXID)?;
164
165 let timestamp = u64::from_be_bytes(buf[32..40].try_into().unwrap());
166 let timestamp = time::SystemTime::UNIX_EPOCH
167 .checked_add(time::Duration::from_millis(timestamp))
168 .ok_or(HeaderDecodeError::Timestamp(timestamp))?;
169
170 let pre_apply_checksum = u64::from_be_bytes(buf[40..48].try_into().unwrap());
171 let pre_apply_checksum = if pre_apply_checksum != 0 {
172 Some(Checksum::new(pre_apply_checksum))
173 } else {
174 None
175 };
176
177 let hdr = Header {
178 flags,
179 page_size,
180 commit,
181 min_txid,
182 max_txid,
183 timestamp,
184 pre_apply_checksum,
185 };
186
187 hdr.validate()?;
188
189 Ok(hdr)
190 }
191}
192
193#[derive(thiserror::Error, Debug)]
195pub enum TrailerEncodeError {
196 #[error("write error")]
197 Write(#[from] io::Error),
198}
199
200#[derive(thiserror::Error, Debug)]
202pub enum TrailerDecodeError {
203 #[error("read error")]
204 Read(#[from] io::Error),
205 #[error("invalid post apply checksum: {0}")]
206 PostApplyChecksum(u64),
207 #[error("invalid file checksum: {0}")]
208 FileChecksum(u64),
209}
210
211#[derive(Debug, PartialEq, Eq)]
213pub struct Trailer {
214 pub post_apply_checksum: Checksum,
216 pub file_checksum: Checksum,
218}
219
220impl Trailer {
221 pub(crate) fn encode_into<W>(&self, mut w: W) -> Result<(), TrailerEncodeError>
222 where
223 W: io::Write,
224 {
225 let mut buf = Vec::with_capacity(TRAILER_SIZE);
226
227 buf.extend_from_slice(&self.post_apply_checksum.into_inner().to_be_bytes());
228 buf.extend_from_slice(&self.file_checksum.into_inner().to_be_bytes());
229
230 w.write_all(&buf)?;
231
232 Ok(())
233 }
234
235 pub(crate) fn decode_from<R>(mut r: R) -> Result<Trailer, TrailerDecodeError>
236 where
237 R: io::Read,
238 {
239 let mut buf = [0; TRAILER_SIZE];
240 r.read_exact(&mut buf)?;
241
242 let post_apply_checksum = u64::from_be_bytes(buf[0..8].try_into().unwrap());
243 let file_checksum = u64::from_be_bytes(buf[8..16].try_into().unwrap());
244
245 let trailer = Trailer {
246 post_apply_checksum: Checksum::new(post_apply_checksum),
247 file_checksum: Checksum::new(file_checksum),
248 };
249 if trailer.post_apply_checksum.into_inner() != post_apply_checksum {
250 return Err(TrailerDecodeError::PostApplyChecksum(post_apply_checksum));
251 }
252 if trailer.file_checksum.into_inner() != file_checksum {
253 return Err(TrailerDecodeError::FileChecksum(file_checksum));
254 }
255
256 Ok(trailer)
257 }
258}
259
260#[derive(thiserror::Error, Debug)]
262pub enum PageHeaderEncodeError {
263 #[error("write error")]
264 Write(#[from] io::Error),
265}
266
267#[derive(thiserror::Error, Debug)]
269pub enum PageHeaderDecodeError {
270 #[error("read error")]
271 Read(#[from] io::Error),
272 #[error("invalid page number record: {0}")]
273 PageNum(PageNumError),
274}
275
276#[derive(Debug, PartialEq, Eq)]
277pub(crate) struct PageHeader(pub(crate) Option<PageNum>);
278
279impl PageHeader {
280 pub(crate) fn encode_into<W>(&self, mut w: W) -> Result<(), PageHeaderEncodeError>
281 where
282 W: io::Write,
283 {
284 let page_num = self.0.map(|n| n.into_inner()).unwrap_or(0);
285 w.write_all(&page_num.to_be_bytes())?;
286
287 Ok(())
288 }
289
290 pub(crate) fn decode_from<R>(mut r: R) -> Result<PageHeader, PageHeaderDecodeError>
291 where
292 R: io::Read,
293 {
294 let mut buf = [0; PAGE_HEADER_SIZE];
295 r.read_exact(&mut buf)?;
296
297 let page_num = u32::from_be_bytes(buf[0..4].try_into().unwrap());
298 let page_num = if page_num != 0 {
299 Some(PageNum::new(page_num).map_err(PageHeaderDecodeError::PageNum)?)
300 } else {
301 None
302 };
303
304 Ok(PageHeader(page_num))
305 }
306}
307
308pub trait PageChecksum {
310 fn page_checksum(&self, pgno: PageNum) -> Checksum;
312}
313
314impl<T> PageChecksum for T
315where
316 T: AsRef<[u8]>,
317{
318 fn page_checksum(&self, pgno: PageNum) -> Checksum {
319 let mut digest = CRC64.digest();
320
321 digest.update(&pgno.into_inner().to_be_bytes());
322 digest.update(self.as_ref());
323
324 Checksum::new(digest.finalize())
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::{Header, HeaderFlags, HeaderValidateError, PageHeader, Trailer};
331 use crate::{utils::TimeRound, Checksum, PageNum, PageSize, TXID};
332 use std::time;
333
334 fn encode_decode_header(mut hdr: Header) {
335 let mut buf = Vec::new();
336
337 hdr.timestamp = hdr.timestamp.round(time::Duration::from_millis(1)).unwrap();
339
340 hdr.encode_into(&mut buf).expect("failed to encode header");
341 let hdr_out = Header::decode_from(buf.as_slice()).expect("failed to decode header");
342
343 assert_eq!(hdr_out, hdr);
344 }
345
346 #[test]
347 fn snapshot_header() {
348 encode_decode_header(Header {
349 flags: HeaderFlags::COMPRESS_LZ4,
350 page_size: PageSize::new(4096).unwrap(),
351 commit: PageNum::new(10).unwrap(),
352 min_txid: TXID::new(1).unwrap(),
353 max_txid: TXID::new(5).unwrap(),
354 timestamp: time::SystemTime::now(),
355 pre_apply_checksum: None,
356 });
357 }
358
359 #[test]
360 fn non_snapshot_header() {
361 encode_decode_header(Header {
362 flags: HeaderFlags::COMPRESS_LZ4,
363 page_size: PageSize::new(4096).unwrap(),
364 commit: PageNum::new(10).unwrap(),
365 min_txid: TXID::new(3).unwrap(),
366 max_txid: TXID::new(5).unwrap(),
367 timestamp: time::SystemTime::now(),
368 pre_apply_checksum: Some(Checksum::new(123)),
369 });
370 }
371
372 #[test]
373 fn validate_header() {
374 let hdr = Header {
375 flags: HeaderFlags::COMPRESS_LZ4,
376 page_size: PageSize::new(4096).unwrap(),
377 commit: PageNum::new(10).unwrap(),
378 min_txid: TXID::new(5).unwrap(),
379 max_txid: TXID::new(3).unwrap(),
380 timestamp: time::SystemTime::now(),
381 pre_apply_checksum: Some(Checksum::new(123)),
382 };
383 assert!(matches!(
384 hdr.validate(),
385 Err(HeaderValidateError::TXIDOrder(min, max)) if min == hdr.min_txid && max == hdr.max_txid));
386
387 let hdr = Header {
388 flags: HeaderFlags::COMPRESS_LZ4,
389 page_size: PageSize::new(4096).unwrap(),
390 commit: PageNum::new(10).unwrap(),
391 min_txid: TXID::new(1).unwrap(),
392 max_txid: TXID::new(3).unwrap(),
393 timestamp: time::SystemTime::now(),
394 pre_apply_checksum: Some(Checksum::new(123)),
395 };
396 assert!(matches!(
397 hdr.validate(),
398 Err(HeaderValidateError::PreApplyChecksumOnSnapshot)
399 ));
400
401 let hdr = Header {
402 flags: HeaderFlags::COMPRESS_LZ4,
403 page_size: PageSize::new(4096).unwrap(),
404 commit: PageNum::new(10).unwrap(),
405 min_txid: TXID::new(3).unwrap(),
406 max_txid: TXID::new(5).unwrap(),
407 timestamp: time::SystemTime::now(),
408 pre_apply_checksum: None,
409 };
410 assert!(matches!(
411 hdr.validate(),
412 Err(HeaderValidateError::NoPreApplyChecksum)
413 ));
414 }
415
416 #[test]
417 fn trailer() {
418 let mut buf = Vec::new();
419
420 let trailer = Trailer {
421 post_apply_checksum: Checksum::new(123),
422 file_checksum: Checksum::new(123),
423 };
424 trailer
425 .encode_into(&mut buf)
426 .expect("failed to encode trailer");
427 let trailer_out = Trailer::decode_from(buf.as_slice()).expect("failed to decode trailer");
428
429 assert_eq!(trailer_out, trailer);
430 }
431
432 #[test]
433 fn page_header() {
434 let mut buf = Vec::new();
435
436 let page_header = PageHeader(Some(PageNum::new(10).unwrap()));
437 page_header
438 .encode_into(&mut buf)
439 .expect("failed to encode page header");
440 let page_header_out =
441 PageHeader::decode_from(buf.as_slice()).expect("failed to decode page header");
442
443 assert_eq!(page_header_out, page_header);
444 }
445
446 #[test]
447 fn empty_page_header() {
448 let mut buf = Vec::new();
449
450 let page_header = PageHeader(None);
451 page_header
452 .encode_into(&mut buf)
453 .expect("failed to encode page header");
454 let page_header_out =
455 PageHeader::decode_from(buf.as_slice()).expect("failed to decode page header");
456
457 assert_eq!(page_header_out, page_header);
458 }
459}