quill_sql/storage/codec/
table_page.rs

1use crate::buffer::PAGE_SIZE;
2use crate::catalog::SchemaRef;
3use crate::error::{QuillSQLError, QuillSQLResult};
4use crate::storage::codec::{CommonCodec, DecodedData};
5use crate::storage::page::{RecordId, TablePage, TablePageHeader, TupleInfo, TupleMeta};
6use crate::utils::util::page_bytes_to_array;
7
8pub struct TablePageCodec;
9
10impl TablePageCodec {
11    pub fn encode(page: &TablePage) -> Vec<u8> {
12        let header_bytes = TablePageHeaderCodec::encode(&page.header);
13        let mut all_bytes = page.data;
14        all_bytes[0..header_bytes.len()].copy_from_slice(&header_bytes);
15        all_bytes.to_vec()
16    }
17
18    pub fn decode(bytes: &[u8], schema: SchemaRef) -> QuillSQLResult<DecodedData<TablePage>> {
19        if bytes.len() != PAGE_SIZE {
20            return Err(QuillSQLError::Storage(format!(
21                "Table page size is not {} instead of {}",
22                PAGE_SIZE,
23                bytes.len()
24            )));
25        }
26        let (header, _) = TablePageHeaderCodec::decode(bytes)?;
27        Ok((
28            TablePage {
29                schema,
30                header,
31                data: page_bytes_to_array(&bytes[0..PAGE_SIZE]),
32            },
33            PAGE_SIZE,
34        ))
35    }
36}
37
38pub struct TablePageHeaderCodec;
39
40impl TablePageHeaderCodec {
41    pub fn encode(header: &TablePageHeader) -> Vec<u8> {
42        let mut bytes = Vec::new();
43        bytes.extend(CommonCodec::encode_u64(header.lsn));
44        bytes.extend(CommonCodec::encode_u32(header.next_page_id));
45        bytes.extend(CommonCodec::encode_u16(header.num_tuples));
46        bytes.extend(CommonCodec::encode_u16(header.num_deleted_tuples));
47        for tuple_info in header.tuple_infos.iter() {
48            bytes.extend(TablePageHeaderTupleInfoCodec::encode(tuple_info));
49        }
50        bytes
51    }
52
53    pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<TablePageHeader>> {
54        let mut left_bytes = bytes;
55
56        let (lsn, offset) = CommonCodec::decode_u64(left_bytes)?;
57        left_bytes = &left_bytes[offset..];
58
59        let (next_page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
60        left_bytes = &left_bytes[offset..];
61
62        let (num_tuples, offset) = CommonCodec::decode_u16(left_bytes)?;
63        left_bytes = &left_bytes[offset..];
64
65        let (num_deleted_tuples, offset) = CommonCodec::decode_u16(left_bytes)?;
66        left_bytes = &left_bytes[offset..];
67
68        let mut tuple_infos = vec![];
69        for _ in 0..num_tuples {
70            let (tuple_info, offset) = TablePageHeaderTupleInfoCodec::decode(left_bytes)?;
71            left_bytes = &left_bytes[offset..];
72            tuple_infos.push(tuple_info);
73        }
74        Ok((
75            TablePageHeader {
76                lsn,
77                next_page_id,
78                num_tuples,
79                num_deleted_tuples,
80                tuple_infos,
81            },
82            bytes.len() - left_bytes.len(),
83        ))
84    }
85}
86
87pub struct TablePageHeaderTupleInfoCodec;
88
89impl TablePageHeaderTupleInfoCodec {
90    pub fn encode(tuple_info: &TupleInfo) -> Vec<u8> {
91        let mut bytes = Vec::new();
92        bytes.extend(CommonCodec::encode_u16(tuple_info.offset));
93        bytes.extend(CommonCodec::encode_u16(tuple_info.size));
94        bytes.extend(CommonCodec::encode_u64(tuple_info.meta.insert_txn_id));
95        bytes.extend(CommonCodec::encode_u32(tuple_info.meta.insert_cid));
96        bytes.extend(CommonCodec::encode_u64(tuple_info.meta.delete_txn_id));
97        bytes.extend(CommonCodec::encode_u32(tuple_info.meta.delete_cid));
98        bytes.extend(CommonCodec::encode_bool(tuple_info.meta.is_deleted));
99        bytes.extend(CommonCodec::encode_bool(
100            tuple_info.meta.next_version.is_some(),
101        ));
102        if let Some(next) = tuple_info.meta.next_version {
103            bytes.extend(RidCodec::encode(&next));
104        }
105        bytes.extend(CommonCodec::encode_bool(
106            tuple_info.meta.prev_version.is_some(),
107        ));
108        if let Some(prev) = tuple_info.meta.prev_version {
109            bytes.extend(RidCodec::encode(&prev));
110        }
111        bytes
112    }
113
114    pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<TupleInfo>> {
115        let mut left_bytes = bytes;
116        let (tuple_offset, offset) = CommonCodec::decode_u16(left_bytes)?;
117        left_bytes = &left_bytes[offset..];
118        let (size, offset) = CommonCodec::decode_u16(left_bytes)?;
119        left_bytes = &left_bytes[offset..];
120        let (insert_txn_id, offset) = CommonCodec::decode_u64(left_bytes)?;
121        left_bytes = &left_bytes[offset..];
122        let (insert_cid, offset) = CommonCodec::decode_u32(left_bytes)?;
123        left_bytes = &left_bytes[offset..];
124        let (delete_txn_id, offset) = CommonCodec::decode_u64(left_bytes)?;
125        left_bytes = &left_bytes[offset..];
126        let (delete_cid, offset) = CommonCodec::decode_u32(left_bytes)?;
127        left_bytes = &left_bytes[offset..];
128        let (is_deleted, offset) = CommonCodec::decode_bool(left_bytes)?;
129        left_bytes = &left_bytes[offset..];
130        let (has_next, offset) = CommonCodec::decode_bool(left_bytes)?;
131        left_bytes = &left_bytes[offset..];
132        let (next_version, consumed_next) = if has_next {
133            let (rid, offset) = RidCodec::decode(left_bytes)?;
134            (Some(rid), offset)
135        } else {
136            (None, 0)
137        };
138        left_bytes = &left_bytes[consumed_next..];
139        let (has_prev, offset) = CommonCodec::decode_bool(left_bytes)?;
140        left_bytes = &left_bytes[offset..];
141        let (prev_version, consumed_prev) = if has_prev {
142            let (rid, offset) = RidCodec::decode(left_bytes)?;
143            (Some(rid), offset)
144        } else {
145            (None, 0)
146        };
147        left_bytes = &left_bytes[consumed_prev..];
148        Ok((
149            TupleInfo {
150                offset: tuple_offset,
151                size,
152                meta: TupleMeta {
153                    insert_txn_id,
154                    insert_cid,
155                    delete_txn_id,
156                    delete_cid,
157                    is_deleted,
158                    next_version,
159                    prev_version,
160                },
161            },
162            bytes.len() - left_bytes.len(),
163        ))
164    }
165}
166
167pub struct RidCodec;
168
169impl RidCodec {
170    pub fn encode(rid: &RecordId) -> Vec<u8> {
171        let mut bytes = vec![];
172        bytes.extend(CommonCodec::encode_u32(rid.page_id));
173        bytes.extend(CommonCodec::encode_u32(rid.slot_num));
174        bytes
175    }
176
177    pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<RecordId>> {
178        let mut left_bytes = bytes;
179
180        let (page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
181        left_bytes = &left_bytes[offset..];
182
183        let (slot_num, offset) = CommonCodec::decode_u32(left_bytes)?;
184        left_bytes = &left_bytes[offset..];
185
186        Ok((
187            RecordId::new(page_id, slot_num),
188            bytes.len() - left_bytes.len(),
189        ))
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use crate::buffer::INVALID_PAGE_ID;
196    use crate::catalog::{Column, DataType, Schema};
197    use crate::storage::codec::table_page::TablePageHeaderCodec;
198    use crate::storage::codec::TablePageCodec;
199    use crate::storage::page::{TablePage, TupleMeta};
200    use crate::storage::tuple::Tuple;
201    use std::sync::Arc;
202
203    #[test]
204    fn table_page_codec() {
205        let schema = Arc::new(Schema::new(vec![
206            Column::new("a", DataType::Int8, true),
207            Column::new("b", DataType::Int32, true),
208        ]));
209        let tuple1 = Tuple::new(schema.clone(), vec![1i8.into(), 1i32.into()]);
210        let tuple1_meta = TupleMeta::new(1, 0);
211        let tuple2 = Tuple::new(schema.clone(), vec![2i8.into(), 2i32.into()]);
212        let mut tuple2_meta = TupleMeta::new(3, 0);
213        tuple2_meta.mark_deleted(4, 0);
214
215        let mut table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
216        table_page.set_lsn(42);
217        table_page.insert_tuple(&tuple1_meta, &tuple1).unwrap();
218        table_page.insert_tuple(&tuple2_meta, &tuple2).unwrap();
219
220        let (new_page, _) =
221            TablePageCodec::decode(&TablePageCodec::encode(&table_page), schema.clone()).unwrap();
222        assert_eq!(new_page.schema, table_page.schema);
223        assert_eq!(new_page.header, table_page.header);
224        let header_size = TablePageHeaderCodec::encode(&table_page.header).len();
225        assert_eq!(new_page.data[header_size..], table_page.data[header_size..]);
226    }
227}