1use crate::buffer::PAGE_SIZE;
2use crate::catalog::SchemaRef;
3use crate::error::{QuillSQLError, QuillSQLResult};
4use crate::storage::codec::{CommonCodec, DecodedData};
5use crate::storage::page::{RecordId, TablePage, TablePageHeader, TupleInfo, TupleMeta};
6use crate::utils::util::page_bytes_to_array;
7
8pub struct TablePageCodec;
9
10impl TablePageCodec {
11 pub fn encode(page: &TablePage) -> Vec<u8> {
12 let header_bytes = TablePageHeaderCodec::encode(&page.header);
13 let mut all_bytes = page.data;
14 all_bytes[0..header_bytes.len()].copy_from_slice(&header_bytes);
15 all_bytes.to_vec()
16 }
17
18 pub fn decode(bytes: &[u8], schema: SchemaRef) -> QuillSQLResult<DecodedData<TablePage>> {
19 if bytes.len() != PAGE_SIZE {
20 return Err(QuillSQLError::Storage(format!(
21 "Table page size is not {} instead of {}",
22 PAGE_SIZE,
23 bytes.len()
24 )));
25 }
26 let (header, _) = TablePageHeaderCodec::decode(bytes)?;
27 Ok((
28 TablePage {
29 schema,
30 header,
31 data: page_bytes_to_array(&bytes[0..PAGE_SIZE]),
32 },
33 PAGE_SIZE,
34 ))
35 }
36}
37
38pub struct TablePageHeaderCodec;
39
40impl TablePageHeaderCodec {
41 pub fn encode(header: &TablePageHeader) -> Vec<u8> {
42 let mut bytes = Vec::new();
43 bytes.extend(CommonCodec::encode_u64(header.lsn));
44 bytes.extend(CommonCodec::encode_u32(header.next_page_id));
45 bytes.extend(CommonCodec::encode_u16(header.num_tuples));
46 bytes.extend(CommonCodec::encode_u16(header.num_deleted_tuples));
47 for tuple_info in header.tuple_infos.iter() {
48 bytes.extend(TablePageHeaderTupleInfoCodec::encode(tuple_info));
49 }
50 bytes
51 }
52
53 pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<TablePageHeader>> {
54 let mut left_bytes = bytes;
55
56 let (lsn, offset) = CommonCodec::decode_u64(left_bytes)?;
57 left_bytes = &left_bytes[offset..];
58
59 let (next_page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
60 left_bytes = &left_bytes[offset..];
61
62 let (num_tuples, offset) = CommonCodec::decode_u16(left_bytes)?;
63 left_bytes = &left_bytes[offset..];
64
65 let (num_deleted_tuples, offset) = CommonCodec::decode_u16(left_bytes)?;
66 left_bytes = &left_bytes[offset..];
67
68 let mut tuple_infos = vec![];
69 for _ in 0..num_tuples {
70 let (tuple_info, offset) = TablePageHeaderTupleInfoCodec::decode(left_bytes)?;
71 left_bytes = &left_bytes[offset..];
72 tuple_infos.push(tuple_info);
73 }
74 Ok((
75 TablePageHeader {
76 lsn,
77 next_page_id,
78 num_tuples,
79 num_deleted_tuples,
80 tuple_infos,
81 },
82 bytes.len() - left_bytes.len(),
83 ))
84 }
85}
86
87pub struct TablePageHeaderTupleInfoCodec;
88
89impl TablePageHeaderTupleInfoCodec {
90 pub fn encode(tuple_info: &TupleInfo) -> Vec<u8> {
91 let mut bytes = Vec::new();
92 bytes.extend(CommonCodec::encode_u16(tuple_info.offset));
93 bytes.extend(CommonCodec::encode_u16(tuple_info.size));
94 bytes.extend(CommonCodec::encode_u64(tuple_info.meta.insert_txn_id));
95 bytes.extend(CommonCodec::encode_u32(tuple_info.meta.insert_cid));
96 bytes.extend(CommonCodec::encode_u64(tuple_info.meta.delete_txn_id));
97 bytes.extend(CommonCodec::encode_u32(tuple_info.meta.delete_cid));
98 bytes.extend(CommonCodec::encode_bool(tuple_info.meta.is_deleted));
99 bytes.extend(CommonCodec::encode_bool(
100 tuple_info.meta.next_version.is_some(),
101 ));
102 if let Some(next) = tuple_info.meta.next_version {
103 bytes.extend(RidCodec::encode(&next));
104 }
105 bytes.extend(CommonCodec::encode_bool(
106 tuple_info.meta.prev_version.is_some(),
107 ));
108 if let Some(prev) = tuple_info.meta.prev_version {
109 bytes.extend(RidCodec::encode(&prev));
110 }
111 bytes
112 }
113
114 pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<TupleInfo>> {
115 let mut left_bytes = bytes;
116 let (tuple_offset, offset) = CommonCodec::decode_u16(left_bytes)?;
117 left_bytes = &left_bytes[offset..];
118 let (size, offset) = CommonCodec::decode_u16(left_bytes)?;
119 left_bytes = &left_bytes[offset..];
120 let (insert_txn_id, offset) = CommonCodec::decode_u64(left_bytes)?;
121 left_bytes = &left_bytes[offset..];
122 let (insert_cid, offset) = CommonCodec::decode_u32(left_bytes)?;
123 left_bytes = &left_bytes[offset..];
124 let (delete_txn_id, offset) = CommonCodec::decode_u64(left_bytes)?;
125 left_bytes = &left_bytes[offset..];
126 let (delete_cid, offset) = CommonCodec::decode_u32(left_bytes)?;
127 left_bytes = &left_bytes[offset..];
128 let (is_deleted, offset) = CommonCodec::decode_bool(left_bytes)?;
129 left_bytes = &left_bytes[offset..];
130 let (has_next, offset) = CommonCodec::decode_bool(left_bytes)?;
131 left_bytes = &left_bytes[offset..];
132 let (next_version, consumed_next) = if has_next {
133 let (rid, offset) = RidCodec::decode(left_bytes)?;
134 (Some(rid), offset)
135 } else {
136 (None, 0)
137 };
138 left_bytes = &left_bytes[consumed_next..];
139 let (has_prev, offset) = CommonCodec::decode_bool(left_bytes)?;
140 left_bytes = &left_bytes[offset..];
141 let (prev_version, consumed_prev) = if has_prev {
142 let (rid, offset) = RidCodec::decode(left_bytes)?;
143 (Some(rid), offset)
144 } else {
145 (None, 0)
146 };
147 left_bytes = &left_bytes[consumed_prev..];
148 Ok((
149 TupleInfo {
150 offset: tuple_offset,
151 size,
152 meta: TupleMeta {
153 insert_txn_id,
154 insert_cid,
155 delete_txn_id,
156 delete_cid,
157 is_deleted,
158 next_version,
159 prev_version,
160 },
161 },
162 bytes.len() - left_bytes.len(),
163 ))
164 }
165}
166
167pub struct RidCodec;
168
169impl RidCodec {
170 pub fn encode(rid: &RecordId) -> Vec<u8> {
171 let mut bytes = vec![];
172 bytes.extend(CommonCodec::encode_u32(rid.page_id));
173 bytes.extend(CommonCodec::encode_u32(rid.slot_num));
174 bytes
175 }
176
177 pub fn decode(bytes: &[u8]) -> QuillSQLResult<DecodedData<RecordId>> {
178 let mut left_bytes = bytes;
179
180 let (page_id, offset) = CommonCodec::decode_u32(left_bytes)?;
181 left_bytes = &left_bytes[offset..];
182
183 let (slot_num, offset) = CommonCodec::decode_u32(left_bytes)?;
184 left_bytes = &left_bytes[offset..];
185
186 Ok((
187 RecordId::new(page_id, slot_num),
188 bytes.len() - left_bytes.len(),
189 ))
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use crate::buffer::INVALID_PAGE_ID;
196 use crate::catalog::{Column, DataType, Schema};
197 use crate::storage::codec::table_page::TablePageHeaderCodec;
198 use crate::storage::codec::TablePageCodec;
199 use crate::storage::page::{TablePage, TupleMeta};
200 use crate::storage::tuple::Tuple;
201 use std::sync::Arc;
202
203 #[test]
204 fn table_page_codec() {
205 let schema = Arc::new(Schema::new(vec![
206 Column::new("a", DataType::Int8, true),
207 Column::new("b", DataType::Int32, true),
208 ]));
209 let tuple1 = Tuple::new(schema.clone(), vec![1i8.into(), 1i32.into()]);
210 let tuple1_meta = TupleMeta::new(1, 0);
211 let tuple2 = Tuple::new(schema.clone(), vec![2i8.into(), 2i32.into()]);
212 let mut tuple2_meta = TupleMeta::new(3, 0);
213 tuple2_meta.mark_deleted(4, 0);
214
215 let mut table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
216 table_page.set_lsn(42);
217 table_page.insert_tuple(&tuple1_meta, &tuple1).unwrap();
218 table_page.insert_tuple(&tuple2_meta, &tuple2).unwrap();
219
220 let (new_page, _) =
221 TablePageCodec::decode(&TablePageCodec::encode(&table_page), schema.clone()).unwrap();
222 assert_eq!(new_page.schema, table_page.schema);
223 assert_eq!(new_page.header, table_page.header);
224 let header_size = TablePageHeaderCodec::encode(&table_page.header).len();
225 assert_eq!(new_page.data[header_size..], table_page.data[header_size..]);
226 }
227}