Skip to main content

fwob_v2/
page.rs

1use std::io::{Read, Write};
2
3use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
4use crc32fast::Hasher;
5use fwob_core::Key;
6
7use crate::{Codec, Result, V2Error};
8
9pub const PAGE_MAGIC: &[u8; 4] = b"FWP2";
10pub const PAGE_HEADER_LEN: usize = 80;
11const PAGE_HEADER_VERSION: u8 = 2;
12const KEY_SLOT_LEN: usize = 16;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15#[repr(u8)]
16pub enum Encoding {
17    RowRawV1 = 0,
18    ColumnarBasicV1 = 1,
19    ColumnarDeltaV1 = 2,
20}
21
22impl Encoding {
23    pub fn from_id(id: u8) -> Result<Self> {
24        match id {
25            0 => Ok(Self::RowRawV1),
26            1 => Ok(Self::ColumnarBasicV1),
27            2 => Ok(Self::ColumnarDeltaV1),
28            other => Err(V2Error::UnsupportedEncoding(other)),
29        }
30    }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct PageHeader {
35    pub header_version: u8,
36    pub codec: Codec,
37    pub encoding: Encoding,
38    pub flags: u8,
39    pub header_crc32: u32,
40    pub payload_crc32: u32,
41    pub first_key: Key,
42    pub last_key: Key,
43    pub frame_count: u32,
44    pub uncompressed_len: u32,
45    pub compressed_len: u32,
46    pub first_frame_index: u64,
47}
48
49impl PageHeader {
50    #[allow(clippy::too_many_arguments)]
51    pub fn new(
52        codec: Codec,
53        encoding: Encoding,
54        first_key: Key,
55        last_key: Key,
56        frame_count: u32,
57        uncompressed_len: u32,
58        compressed_len: u32,
59        first_frame_index: u64,
60        payload: &[u8],
61    ) -> Self {
62        let payload_crc32 = crc32(payload);
63        let mut header = Self {
64            header_version: PAGE_HEADER_VERSION,
65            codec,
66            encoding,
67            flags: 0,
68            header_crc32: 0,
69            payload_crc32,
70            first_key,
71            last_key,
72            frame_count,
73            uncompressed_len,
74            compressed_len,
75            first_frame_index,
76        };
77        header.header_crc32 = crc32(&header.bytes_with_zero_crc());
78        header
79    }
80
81    pub fn read<R: Read>(reader: &mut R, page_index: u64) -> Result<Self> {
82        let mut magic = [0u8; 4];
83        reader.read_exact(&mut magic)?;
84        if &magic != PAGE_MAGIC {
85            return Err(V2Error::InvalidPageHeader(page_index));
86        }
87        let header_version = reader.read_u8()?;
88        if header_version != PAGE_HEADER_VERSION {
89            return Err(V2Error::InvalidPageHeader(page_index));
90        }
91        let codec = Codec::from_id(reader.read_u8()?)?;
92        let encoding = Encoding::from_id(reader.read_u8()?)?;
93        let flags = reader.read_u8()?;
94        let header_crc32 = reader.read_u32::<LittleEndian>()?;
95        let payload_crc32 = reader.read_u32::<LittleEndian>()?;
96        let first_key = read_key(reader)?;
97        let last_key = read_key(reader)?;
98        let frame_count = reader.read_u32::<LittleEndian>()?;
99        let uncompressed_len = reader.read_u32::<LittleEndian>()?;
100        let compressed_len = reader.read_u32::<LittleEndian>()?;
101        let first_frame_index = reader.read_u64::<LittleEndian>()?;
102
103        let mut reserved = [0u8; 10];
104        reader.read_exact(&mut reserved)?;
105
106        let header = Self {
107            header_version,
108            codec,
109            encoding,
110            flags,
111            header_crc32,
112            payload_crc32,
113            first_key,
114            last_key,
115            frame_count,
116            uncompressed_len,
117            compressed_len,
118            first_frame_index,
119        };
120        if crc32(&header.bytes_with_zero_crc()) != header.header_crc32 {
121            return Err(V2Error::InvalidPageHeader(page_index));
122        }
123        Ok(header)
124    }
125
126    pub fn write<W: Write>(&self, writer: &mut W) -> Result<()> {
127        writer.write_all(PAGE_MAGIC)?;
128        writer.write_u8(self.header_version)?;
129        writer.write_u8(self.codec as u8)?;
130        writer.write_u8(self.encoding as u8)?;
131        writer.write_u8(self.flags)?;
132        writer.write_u32::<LittleEndian>(self.header_crc32)?;
133        writer.write_u32::<LittleEndian>(self.payload_crc32)?;
134        write_key(writer, self.first_key)?;
135        write_key(writer, self.last_key)?;
136        writer.write_u32::<LittleEndian>(self.frame_count)?;
137        writer.write_u32::<LittleEndian>(self.uncompressed_len)?;
138        writer.write_u32::<LittleEndian>(self.compressed_len)?;
139        writer.write_u64::<LittleEndian>(self.first_frame_index)?;
140        writer.write_all(&[0u8; 10])?;
141        Ok(())
142    }
143
144    pub fn validate_payload(&self, payload: &[u8]) -> Result<()> {
145        if crc32(payload) == self.payload_crc32 {
146            Ok(())
147        } else {
148            Err(V2Error::ChecksumMismatch)
149        }
150    }
151
152    pub fn set_first_frame_index(&mut self, first_frame_index: u64) {
153        self.first_frame_index = first_frame_index;
154        self.header_crc32 = crc32(&self.bytes_with_zero_crc());
155    }
156
157    fn bytes_with_zero_crc(&self) -> Vec<u8> {
158        let mut bytes = Vec::with_capacity(PAGE_HEADER_LEN);
159        bytes.extend_from_slice(PAGE_MAGIC);
160        bytes.push(self.header_version);
161        bytes.push(self.codec as u8);
162        bytes.push(self.encoding as u8);
163        bytes.push(self.flags);
164        bytes.extend_from_slice(&0u32.to_le_bytes());
165        bytes.extend_from_slice(&self.payload_crc32.to_le_bytes());
166        write_key(&mut bytes, self.first_key).unwrap();
167        write_key(&mut bytes, self.last_key).unwrap();
168        bytes.extend_from_slice(&self.frame_count.to_le_bytes());
169        bytes.extend_from_slice(&self.uncompressed_len.to_le_bytes());
170        bytes.extend_from_slice(&self.compressed_len.to_le_bytes());
171        bytes.extend_from_slice(&self.first_frame_index.to_le_bytes());
172        bytes.extend_from_slice(&[0u8; 10]);
173        bytes
174    }
175}
176
177fn crc32(bytes: &[u8]) -> u32 {
178    let mut hasher = Hasher::new();
179    hasher.update(bytes);
180    hasher.finalize()
181}
182
183fn write_key<W: Write>(writer: &mut W, key: Key) -> std::io::Result<()> {
184    let tag = match key {
185        Key::I8(_) => 0,
186        Key::I16(_) => 1,
187        Key::I32(_) => 2,
188        Key::I64(_) => 3,
189        Key::U8(_) => 4,
190        Key::U16(_) => 5,
191        Key::U32(_) => 6,
192        Key::U64(_) => 7,
193        Key::F32(_) => 8,
194        Key::F64(_) => 9,
195        Key::Decimal(_) => 10,
196    };
197    writer.write_all(&[tag])?;
198    let mut raw = Vec::with_capacity(KEY_SLOT_LEN);
199    key.encode(&mut raw);
200    raw.resize(KEY_SLOT_LEN, 0);
201    writer.write_all(&raw)?;
202    Ok(())
203}
204
205fn read_key<R: Read>(reader: &mut R) -> Result<Key> {
206    let tag = reader.read_u8()?;
207    let mut raw = [0u8; KEY_SLOT_LEN];
208    reader.read_exact(&mut raw)?;
209    Ok(match tag {
210        0 => Key::I8(raw[0] as i8),
211        1 => Key::I16(i16::from_le_bytes(raw[..2].try_into().unwrap())),
212        2 => Key::I32(i32::from_le_bytes(raw[..4].try_into().unwrap())),
213        3 => Key::I64(i64::from_le_bytes(raw[..8].try_into().unwrap())),
214        4 => Key::U8(raw[0]),
215        5 => Key::U16(u16::from_le_bytes(raw[..2].try_into().unwrap())),
216        6 => Key::U32(u32::from_le_bytes(raw[..4].try_into().unwrap())),
217        7 => Key::U64(u64::from_le_bytes(raw[..8].try_into().unwrap())),
218        8 => Key::F32(f32::from_le_bytes(raw[..4].try_into().unwrap())),
219        9 => Key::F64(f64::from_le_bytes(raw[..8].try_into().unwrap())),
220        10 => Key::decode(fwob_core::KeyType::Decimal, &raw)?,
221        _ => return Err(V2Error::InvalidPageHeader(0)),
222    })
223}