1use std::io::{Read, Write};
2
3use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
4use crc32fast::Hasher;
5use fwob_core::Key;
6
7use crate::{Codec, Result, V2Error};
8
9pub const PAGE_MAGIC: &[u8; 4] = b"FWP2";
10pub const PAGE_HEADER_LEN: usize = 80;
11const PAGE_HEADER_VERSION: u8 = 2;
12const KEY_SLOT_LEN: usize = 16;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15#[repr(u8)]
16pub enum Encoding {
17 RowRawV1 = 0,
18 ColumnarBasicV1 = 1,
19 ColumnarDeltaV1 = 2,
20}
21
22impl Encoding {
23 pub fn from_id(id: u8) -> Result<Self> {
24 match id {
25 0 => Ok(Self::RowRawV1),
26 1 => Ok(Self::ColumnarBasicV1),
27 2 => Ok(Self::ColumnarDeltaV1),
28 other => Err(V2Error::UnsupportedEncoding(other)),
29 }
30 }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct PageHeader {
35 pub header_version: u8,
36 pub codec: Codec,
37 pub encoding: Encoding,
38 pub flags: u8,
39 pub header_crc32: u32,
40 pub payload_crc32: u32,
41 pub first_key: Key,
42 pub last_key: Key,
43 pub frame_count: u32,
44 pub uncompressed_len: u32,
45 pub compressed_len: u32,
46 pub first_frame_index: u64,
47}
48
49impl PageHeader {
50 #[allow(clippy::too_many_arguments)]
51 pub fn new(
52 codec: Codec,
53 encoding: Encoding,
54 first_key: Key,
55 last_key: Key,
56 frame_count: u32,
57 uncompressed_len: u32,
58 compressed_len: u32,
59 first_frame_index: u64,
60 payload: &[u8],
61 ) -> Self {
62 let payload_crc32 = crc32(payload);
63 let mut header = Self {
64 header_version: PAGE_HEADER_VERSION,
65 codec,
66 encoding,
67 flags: 0,
68 header_crc32: 0,
69 payload_crc32,
70 first_key,
71 last_key,
72 frame_count,
73 uncompressed_len,
74 compressed_len,
75 first_frame_index,
76 };
77 header.header_crc32 = crc32(&header.bytes_with_zero_crc());
78 header
79 }
80
81 pub fn read<R: Read>(reader: &mut R, page_index: u64) -> Result<Self> {
82 let mut magic = [0u8; 4];
83 reader.read_exact(&mut magic)?;
84 if &magic != PAGE_MAGIC {
85 return Err(V2Error::InvalidPageHeader(page_index));
86 }
87 let header_version = reader.read_u8()?;
88 if header_version != PAGE_HEADER_VERSION {
89 return Err(V2Error::InvalidPageHeader(page_index));
90 }
91 let codec = Codec::from_id(reader.read_u8()?)?;
92 let encoding = Encoding::from_id(reader.read_u8()?)?;
93 let flags = reader.read_u8()?;
94 let header_crc32 = reader.read_u32::<LittleEndian>()?;
95 let payload_crc32 = reader.read_u32::<LittleEndian>()?;
96 let first_key = read_key(reader)?;
97 let last_key = read_key(reader)?;
98 let frame_count = reader.read_u32::<LittleEndian>()?;
99 let uncompressed_len = reader.read_u32::<LittleEndian>()?;
100 let compressed_len = reader.read_u32::<LittleEndian>()?;
101 let first_frame_index = reader.read_u64::<LittleEndian>()?;
102
103 let mut reserved = [0u8; 10];
104 reader.read_exact(&mut reserved)?;
105
106 let header = Self {
107 header_version,
108 codec,
109 encoding,
110 flags,
111 header_crc32,
112 payload_crc32,
113 first_key,
114 last_key,
115 frame_count,
116 uncompressed_len,
117 compressed_len,
118 first_frame_index,
119 };
120 if crc32(&header.bytes_with_zero_crc()) != header.header_crc32 {
121 return Err(V2Error::InvalidPageHeader(page_index));
122 }
123 Ok(header)
124 }
125
126 pub fn write<W: Write>(&self, writer: &mut W) -> Result<()> {
127 writer.write_all(PAGE_MAGIC)?;
128 writer.write_u8(self.header_version)?;
129 writer.write_u8(self.codec as u8)?;
130 writer.write_u8(self.encoding as u8)?;
131 writer.write_u8(self.flags)?;
132 writer.write_u32::<LittleEndian>(self.header_crc32)?;
133 writer.write_u32::<LittleEndian>(self.payload_crc32)?;
134 write_key(writer, self.first_key)?;
135 write_key(writer, self.last_key)?;
136 writer.write_u32::<LittleEndian>(self.frame_count)?;
137 writer.write_u32::<LittleEndian>(self.uncompressed_len)?;
138 writer.write_u32::<LittleEndian>(self.compressed_len)?;
139 writer.write_u64::<LittleEndian>(self.first_frame_index)?;
140 writer.write_all(&[0u8; 10])?;
141 Ok(())
142 }
143
144 pub fn validate_payload(&self, payload: &[u8]) -> Result<()> {
145 if crc32(payload) == self.payload_crc32 {
146 Ok(())
147 } else {
148 Err(V2Error::ChecksumMismatch)
149 }
150 }
151
152 pub fn set_first_frame_index(&mut self, first_frame_index: u64) {
153 self.first_frame_index = first_frame_index;
154 self.header_crc32 = crc32(&self.bytes_with_zero_crc());
155 }
156
157 fn bytes_with_zero_crc(&self) -> Vec<u8> {
158 let mut bytes = Vec::with_capacity(PAGE_HEADER_LEN);
159 bytes.extend_from_slice(PAGE_MAGIC);
160 bytes.push(self.header_version);
161 bytes.push(self.codec as u8);
162 bytes.push(self.encoding as u8);
163 bytes.push(self.flags);
164 bytes.extend_from_slice(&0u32.to_le_bytes());
165 bytes.extend_from_slice(&self.payload_crc32.to_le_bytes());
166 write_key(&mut bytes, self.first_key).unwrap();
167 write_key(&mut bytes, self.last_key).unwrap();
168 bytes.extend_from_slice(&self.frame_count.to_le_bytes());
169 bytes.extend_from_slice(&self.uncompressed_len.to_le_bytes());
170 bytes.extend_from_slice(&self.compressed_len.to_le_bytes());
171 bytes.extend_from_slice(&self.first_frame_index.to_le_bytes());
172 bytes.extend_from_slice(&[0u8; 10]);
173 bytes
174 }
175}
176
177fn crc32(bytes: &[u8]) -> u32 {
178 let mut hasher = Hasher::new();
179 hasher.update(bytes);
180 hasher.finalize()
181}
182
183fn write_key<W: Write>(writer: &mut W, key: Key) -> std::io::Result<()> {
184 let tag = match key {
185 Key::I8(_) => 0,
186 Key::I16(_) => 1,
187 Key::I32(_) => 2,
188 Key::I64(_) => 3,
189 Key::U8(_) => 4,
190 Key::U16(_) => 5,
191 Key::U32(_) => 6,
192 Key::U64(_) => 7,
193 Key::F32(_) => 8,
194 Key::F64(_) => 9,
195 Key::Decimal(_) => 10,
196 };
197 writer.write_all(&[tag])?;
198 let mut raw = Vec::with_capacity(KEY_SLOT_LEN);
199 key.encode(&mut raw);
200 raw.resize(KEY_SLOT_LEN, 0);
201 writer.write_all(&raw)?;
202 Ok(())
203}
204
205fn read_key<R: Read>(reader: &mut R) -> Result<Key> {
206 let tag = reader.read_u8()?;
207 let mut raw = [0u8; KEY_SLOT_LEN];
208 reader.read_exact(&mut raw)?;
209 Ok(match tag {
210 0 => Key::I8(raw[0] as i8),
211 1 => Key::I16(i16::from_le_bytes(raw[..2].try_into().unwrap())),
212 2 => Key::I32(i32::from_le_bytes(raw[..4].try_into().unwrap())),
213 3 => Key::I64(i64::from_le_bytes(raw[..8].try_into().unwrap())),
214 4 => Key::U8(raw[0]),
215 5 => Key::U16(u16::from_le_bytes(raw[..2].try_into().unwrap())),
216 6 => Key::U32(u32::from_le_bytes(raw[..4].try_into().unwrap())),
217 7 => Key::U64(u64::from_le_bytes(raw[..8].try_into().unwrap())),
218 8 => Key::F32(f32::from_le_bytes(raw[..4].try_into().unwrap())),
219 9 => Key::F64(f64::from_le_bytes(raw[..8].try_into().unwrap())),
220 10 => Key::decode(fwob_core::KeyType::Decimal, &raw)?,
221 _ => return Err(V2Error::InvalidPageHeader(0)),
222 })
223}