feophantlib/engine/io/page_formats/
page_data.rs1use crate::engine::io::format_traits::{Parseable, Serializable};
2use crate::engine::io::{ConstEncodedSize, EncodedSize};
3use crate::engine::objects::SqlTuple;
4use crate::engine::transactions::TransactionId;
5
6use super::super::super::objects::Table;
7use super::super::row_formats::{ItemPointer, RowData, RowDataError};
8use super::{
9 ItemIdData, ItemIdDataError, PageHeader, PageHeaderError, PageOffset, UInt12, UInt12Error,
10};
11use async_stream::stream;
12use bytes::{BufMut, Bytes};
13use futures::stream::Stream;
14use std::convert::TryFrom;
15use std::sync::Arc;
16use thiserror::Error;
17
18pub struct PageData {
19 page: PageOffset,
20 page_header: PageHeader,
21 item_ids: Vec<ItemIdData>,
22 rows: Vec<RowData>,
24}
25
26impl PageData {
27 pub fn new(page: PageOffset) -> PageData {
28 PageData {
29 page,
30 page_header: PageHeader::new(),
31 item_ids: vec![],
32 rows: vec![],
33 }
34 }
35
36 pub fn can_fit(&self, row_data_size: usize) -> bool {
38 self.page_header.can_fit(row_data_size)
39 }
40
41 pub fn insert(
42 &mut self,
43 current_tran_id: TransactionId,
44 table: &Arc<Table>,
45 user_data: SqlTuple,
46 ) -> Result<ItemPointer, PageDataError> {
47 let item_pointer = ItemPointer::new(self.page, UInt12::try_from(self.rows.len())?);
48 let row_data_len = RowData::encoded_size(&user_data);
49 let row_data = RowData::new(
50 table.sql_type.clone(),
51 current_tran_id,
52 None,
53 item_pointer,
54 user_data,
55 );
56
57 let item_data = self.page_header.add_item(row_data_len)?;
58 self.item_ids.push(item_data);
59 self.rows.push(row_data);
60 Ok(item_pointer)
61 }
62
63 pub fn update(&mut self, row_data: RowData, row_count: UInt12) -> Result<(), PageDataError> {
64 let row_data_len = RowData::encoded_size(&row_data.user_data);
65 let row_count = row_count.to_usize();
66 if row_count > self.item_ids.len() - 1 || row_count > self.rows.len() - 1 {
67 return Err(PageDataError::IndexOutofBounds(
68 row_count,
69 self.item_ids.len(),
70 self.rows.len(),
71 ));
72 }
73
74 let iid = &self.item_ids[row_count];
75 if iid.length.to_usize() != row_data_len {
76 return Err(PageDataError::UpdateChangedLength(
77 iid.length.to_usize(),
78 row_data_len,
79 ));
80 }
81
82 self.rows[row_count] = row_data;
83 Ok(())
84 }
85
86 pub fn get_row(&self, count: UInt12) -> Option<&RowData> {
87 self.rows.get(count.to_usize())
88 }
89
90 pub fn get_stream(&self) -> impl Stream<Item = RowData> {
91 let rows_clone = self.rows.clone();
92 stream! {
93 for row in rows_clone.iter() {
94 yield row.clone();
95 }
96 }
97 }
98
99 pub fn parse(
100 table: &Arc<Table>,
101 page: PageOffset,
102 buffer: &Bytes,
103 ) -> Result<PageData, PageDataError> {
104 let mut page_header_slice = &buffer[0..PageHeader::encoded_size()];
107 let page_header = PageHeader::parse(&mut page_header_slice)?;
108
109 let mut item_ids: Vec<ItemIdData> = Vec::with_capacity(page_header.get_item_count());
110 let mut rows: Vec<RowData> = Vec::with_capacity(page_header.get_item_count());
111 for i in 0..page_header.get_item_count() {
112 let iid_lower_offset = PageHeader::encoded_size() + (ItemIdData::encoded_size() * i);
113 let iid_upper_offset =
114 PageHeader::encoded_size() + (ItemIdData::encoded_size() * (i + 1));
115 let mut iid_slice = &buffer[iid_lower_offset..iid_upper_offset];
116 let iid = ItemIdData::parse(&mut iid_slice)?;
117
118 let mut row_slice = &buffer[iid.get_range()];
119 let row = RowData::parse(table.clone(), &mut row_slice)?;
120 item_ids.push(iid);
121 rows.push(row);
122 }
123
124 Ok(PageData {
125 page,
126 page_header,
127 item_ids,
128 rows,
129 })
130 }
131}
132
133impl Serializable for PageData {
134 fn serialize(&self, buffer: &mut impl BufMut) {
135 self.page_header.serialize(buffer);
136
137 self.item_ids.iter().for_each(|f| f.serialize(buffer));
139
140 let free_space = vec![0; self.page_header.get_free_space()];
142 buffer.put_slice(&free_space);
143
144 self.rows.iter().rev().for_each(|r| r.serialize(buffer));
146 }
147}
148
149#[derive(Debug, Error)]
150pub enum PageDataError {
151 #[error(transparent)]
152 PageHeaderParseError(#[from] PageHeaderError),
153 #[error(transparent)]
154 ItemIdDataParseError(#[from] ItemIdDataError),
155 #[error(transparent)]
156 RowDataParseError(#[from] RowDataError),
157 #[error(transparent)]
158 UInt12Error(#[from] UInt12Error),
159 #[error("Row {0} does not exist to update we have {1}:{2} rows")]
160 IndexOutofBounds(usize, usize, usize),
161 #[error("Updates cannot change row length! Old: {0} New: {1}")]
162 UpdateChangedLength(usize, usize),
163}
164
165#[cfg(test)]
166mod tests {
167 use crate::constants::PAGE_SIZE;
168 use crate::engine::get_table;
169 use crate::engine::objects::SqlTuple;
170
171 use super::super::super::super::objects::types::BaseSqlTypes;
172 use super::super::super::super::transactions::TransactionId;
173 use super::*;
174 use bytes::BytesMut;
175 use futures::pin_mut;
176 use tokio_stream::StreamExt;
177
178 fn get_item_pointer(row_num: usize) -> ItemPointer {
179 ItemPointer::new(PageOffset(0), UInt12::new(row_num as u16).unwrap())
180 }
181
182 #[tokio::test]
183 async fn test_page_data_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
184 let table = get_table();
185
186 let rows = vec!(RowData::new(table.sql_type.clone(),
187 TransactionId::new(0xDEADBEEF),
188 None,
189 get_item_pointer(0),
190 SqlTuple(vec![
191 Some(BaseSqlTypes::Text("this is a test".to_string())),
192 None,
193 Some(BaseSqlTypes::Text("blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah".to_string())),
194 ]),
195 ));
196
197 let mut pd = PageData::new(PageOffset(0));
198 for r in rows.clone() {
199 assert!(pd.insert(r.min, &table, r.user_data).is_ok());
200 }
201 let mut serial = BytesMut::with_capacity(PAGE_SIZE as usize);
202 pd.serialize(&mut serial);
203
204 assert_eq!(PAGE_SIZE as usize, serial.len());
205 let pg_parsed = PageData::parse(&table, PageOffset(0), &serial.freeze()).unwrap();
206
207 pin_mut!(pg_parsed);
208 let result_rows: Vec<RowData> = pg_parsed.get_stream().collect().await;
209 assert_eq!(rows, result_rows);
210
211 Ok(())
212 }
213
214 #[tokio::test]
215 async fn test_page_data_roundtrip_two_rows() {
216 let table = get_table();
217
218 let rows = vec!(RowData::new(table.sql_type.clone(),
219 TransactionId::new(0xDEADBEEF),
220 None,
221 get_item_pointer(0),
222 SqlTuple(vec![
223 Some(BaseSqlTypes::Text("this is a test".to_string())),
224 None,
225 Some(BaseSqlTypes::Text("blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah".to_string())),
226 ]),
227 ), RowData::new(table.sql_type.clone(),
228 TransactionId::new(0xDEADBEEF),
229 None,
230 get_item_pointer(1),
231 SqlTuple(vec![
232 Some(BaseSqlTypes::Text("this also a test".to_string())),
233 None,
234 Some(BaseSqlTypes::Text("it would help if I didn't mix and match types".to_string())),
235 ]),
236 ));
237
238 let mut pd = PageData::new(PageOffset(0));
239 for r in rows.clone() {
240 assert!(pd.insert(r.min, &table, r.user_data).is_ok());
241 }
242 let mut serial = BytesMut::with_capacity(PAGE_SIZE as usize);
243 pd.serialize(&mut serial);
244 let pg_parsed = PageData::parse(&table, PageOffset(0), &serial.freeze()).unwrap();
245
246 pin_mut!(pg_parsed);
247 let result_rows: Vec<RowData> = pg_parsed.get_stream().collect().await;
248 assert_eq!(rows, result_rows);
249 }
250
251 #[tokio::test]
252 async fn test_page_data_update() {
253 let table = get_table();
254
255 let mut row = RowData::new(table.sql_type.clone(),
256 TransactionId::new(0xDEADBEEF),
257 None,
258 get_item_pointer(0),
259 SqlTuple(vec![
260 Some(BaseSqlTypes::Text("this is a test".to_string())),
261 None,
262 Some(BaseSqlTypes::Text("blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah".to_string())),
263 ]),
264 );
265
266 let mut pd = PageData::new(PageOffset(0));
267 let rip = pd.insert(row.min, &table, row.user_data.clone());
268 assert!(rip.is_ok());
269
270 let ip = rip.unwrap();
271
272 row.item_pointer = get_item_pointer(1);
273
274 assert!(pd.update(row.clone(), ip.count).is_ok());
275
276 pin_mut!(pd);
277 let result_rows: Vec<RowData> = pd.get_stream().collect().await;
278 assert_eq!(row, result_rows[0]);
279 }
280}