feophantlib/engine/io/page_formats/
page_data.rs

1use crate::engine::io::format_traits::{Parseable, Serializable};
2use crate::engine::io::{ConstEncodedSize, EncodedSize};
3use crate::engine::objects::SqlTuple;
4use crate::engine::transactions::TransactionId;
5
6use super::super::super::objects::Table;
7use super::super::row_formats::{ItemPointer, RowData, RowDataError};
8use super::{
9    ItemIdData, ItemIdDataError, PageHeader, PageHeaderError, PageOffset, UInt12, UInt12Error,
10};
11use async_stream::stream;
12use bytes::{BufMut, Bytes};
13use futures::stream::Stream;
14use std::convert::TryFrom;
15use std::sync::Arc;
16use thiserror::Error;
17
18pub struct PageData {
19    page: PageOffset,
20    page_header: PageHeader,
21    item_ids: Vec<ItemIdData>,
22    //TODO debating if I should defer parsing until later
23    rows: Vec<RowData>,
24}
25
26impl PageData {
27    pub fn new(page: PageOffset) -> PageData {
28        PageData {
29            page,
30            page_header: PageHeader::new(),
31            item_ids: vec![],
32            rows: vec![],
33        }
34    }
35
36    //fast check if there is still space in this page
37    pub fn can_fit(&self, row_data_size: usize) -> bool {
38        self.page_header.can_fit(row_data_size)
39    }
40
41    pub fn insert(
42        &mut self,
43        current_tran_id: TransactionId,
44        table: &Arc<Table>,
45        user_data: SqlTuple,
46    ) -> Result<ItemPointer, PageDataError> {
47        let item_pointer = ItemPointer::new(self.page, UInt12::try_from(self.rows.len())?);
48        let row_data_len = RowData::encoded_size(&user_data);
49        let row_data = RowData::new(
50            table.sql_type.clone(),
51            current_tran_id,
52            None,
53            item_pointer,
54            user_data,
55        );
56
57        let item_data = self.page_header.add_item(row_data_len)?;
58        self.item_ids.push(item_data);
59        self.rows.push(row_data);
60        Ok(item_pointer)
61    }
62
63    pub fn update(&mut self, row_data: RowData, row_count: UInt12) -> Result<(), PageDataError> {
64        let row_data_len = RowData::encoded_size(&row_data.user_data);
65        let row_count = row_count.to_usize();
66        if row_count > self.item_ids.len() - 1 || row_count > self.rows.len() - 1 {
67            return Err(PageDataError::IndexOutofBounds(
68                row_count,
69                self.item_ids.len(),
70                self.rows.len(),
71            ));
72        }
73
74        let iid = &self.item_ids[row_count];
75        if iid.length.to_usize() != row_data_len {
76            return Err(PageDataError::UpdateChangedLength(
77                iid.length.to_usize(),
78                row_data_len,
79            ));
80        }
81
82        self.rows[row_count] = row_data;
83        Ok(())
84    }
85
86    pub fn get_row(&self, count: UInt12) -> Option<&RowData> {
87        self.rows.get(count.to_usize())
88    }
89
90    pub fn get_stream(&self) -> impl Stream<Item = RowData> {
91        let rows_clone = self.rows.clone();
92        stream! {
93            for row in rows_clone.iter() {
94                yield row.clone();
95            }
96        }
97    }
98
99    pub fn parse(
100        table: &Arc<Table>,
101        page: PageOffset,
102        buffer: &Bytes,
103    ) -> Result<PageData, PageDataError> {
104        //Note since we need random access, everything MUST work off slices otherwise counts will be off
105
106        let mut page_header_slice = &buffer[0..PageHeader::encoded_size()];
107        let page_header = PageHeader::parse(&mut page_header_slice)?;
108
109        let mut item_ids: Vec<ItemIdData> = Vec::with_capacity(page_header.get_item_count());
110        let mut rows: Vec<RowData> = Vec::with_capacity(page_header.get_item_count());
111        for i in 0..page_header.get_item_count() {
112            let iid_lower_offset = PageHeader::encoded_size() + (ItemIdData::encoded_size() * i);
113            let iid_upper_offset =
114                PageHeader::encoded_size() + (ItemIdData::encoded_size() * (i + 1));
115            let mut iid_slice = &buffer[iid_lower_offset..iid_upper_offset];
116            let iid = ItemIdData::parse(&mut iid_slice)?;
117
118            let mut row_slice = &buffer[iid.get_range()];
119            let row = RowData::parse(table.clone(), &mut row_slice)?;
120            item_ids.push(iid);
121            rows.push(row);
122        }
123
124        Ok(PageData {
125            page,
126            page_header,
127            item_ids,
128            rows,
129        })
130    }
131}
132
133impl Serializable for PageData {
134    fn serialize(&self, buffer: &mut impl BufMut) {
135        self.page_header.serialize(buffer);
136
137        //Now write items data in order
138        self.item_ids.iter().for_each(|f| f.serialize(buffer));
139
140        //Fill the free space
141        let free_space = vec![0; self.page_header.get_free_space()];
142        buffer.put_slice(&free_space);
143
144        //Write items in reverse order
145        self.rows.iter().rev().for_each(|r| r.serialize(buffer));
146    }
147}
148
149#[derive(Debug, Error)]
150pub enum PageDataError {
151    #[error(transparent)]
152    PageHeaderParseError(#[from] PageHeaderError),
153    #[error(transparent)]
154    ItemIdDataParseError(#[from] ItemIdDataError),
155    #[error(transparent)]
156    RowDataParseError(#[from] RowDataError),
157    #[error(transparent)]
158    UInt12Error(#[from] UInt12Error),
159    #[error("Row {0} does not exist to update we have {1}:{2} rows")]
160    IndexOutofBounds(usize, usize, usize),
161    #[error("Updates cannot change row length! Old: {0} New: {1}")]
162    UpdateChangedLength(usize, usize),
163}
164
165#[cfg(test)]
166mod tests {
167    use crate::constants::PAGE_SIZE;
168    use crate::engine::get_table;
169    use crate::engine::objects::SqlTuple;
170
171    use super::super::super::super::objects::types::BaseSqlTypes;
172    use super::super::super::super::transactions::TransactionId;
173    use super::*;
174    use bytes::BytesMut;
175    use futures::pin_mut;
176    use tokio_stream::StreamExt;
177
178    fn get_item_pointer(row_num: usize) -> ItemPointer {
179        ItemPointer::new(PageOffset(0), UInt12::new(row_num as u16).unwrap())
180    }
181
182    #[tokio::test]
183    async fn test_page_data_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
184        let table = get_table();
185
186        let rows = vec!(RowData::new(table.sql_type.clone(),
187            TransactionId::new(0xDEADBEEF),
188            None,
189            get_item_pointer(0),
190            SqlTuple(vec![
191                Some(BaseSqlTypes::Text("this is a test".to_string())),
192                None,
193                Some(BaseSqlTypes::Text("blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah".to_string())),
194            ]),
195        ));
196
197        let mut pd = PageData::new(PageOffset(0));
198        for r in rows.clone() {
199            assert!(pd.insert(r.min, &table, r.user_data).is_ok());
200        }
201        let mut serial = BytesMut::with_capacity(PAGE_SIZE as usize);
202        pd.serialize(&mut serial);
203
204        assert_eq!(PAGE_SIZE as usize, serial.len());
205        let pg_parsed = PageData::parse(&table, PageOffset(0), &serial.freeze()).unwrap();
206
207        pin_mut!(pg_parsed);
208        let result_rows: Vec<RowData> = pg_parsed.get_stream().collect().await;
209        assert_eq!(rows, result_rows);
210
211        Ok(())
212    }
213
214    #[tokio::test]
215    async fn test_page_data_roundtrip_two_rows() {
216        let table = get_table();
217
218        let rows = vec!(RowData::new(table.sql_type.clone(),
219            TransactionId::new(0xDEADBEEF),
220            None,
221            get_item_pointer(0),
222            SqlTuple(vec![
223                Some(BaseSqlTypes::Text("this is a test".to_string())),
224                None,
225                Some(BaseSqlTypes::Text("blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah".to_string())),
226            ]),
227        ), RowData::new(table.sql_type.clone(),
228        TransactionId::new(0xDEADBEEF),
229        None,
230        get_item_pointer(1),
231        SqlTuple(vec![
232            Some(BaseSqlTypes::Text("this also a test".to_string())),
233            None,
234            Some(BaseSqlTypes::Text("it would help if I didn't mix and match types".to_string())),
235        ]),
236        ));
237
238        let mut pd = PageData::new(PageOffset(0));
239        for r in rows.clone() {
240            assert!(pd.insert(r.min, &table, r.user_data).is_ok());
241        }
242        let mut serial = BytesMut::with_capacity(PAGE_SIZE as usize);
243        pd.serialize(&mut serial);
244        let pg_parsed = PageData::parse(&table, PageOffset(0), &serial.freeze()).unwrap();
245
246        pin_mut!(pg_parsed);
247        let result_rows: Vec<RowData> = pg_parsed.get_stream().collect().await;
248        assert_eq!(rows, result_rows);
249    }
250
251    #[tokio::test]
252    async fn test_page_data_update() {
253        let table = get_table();
254
255        let mut row = RowData::new(table.sql_type.clone(),
256            TransactionId::new(0xDEADBEEF),
257            None,
258            get_item_pointer(0),
259            SqlTuple(vec![
260                Some(BaseSqlTypes::Text("this is a test".to_string())),
261                None,
262                Some(BaseSqlTypes::Text("blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah".to_string())),
263            ]),
264        );
265
266        let mut pd = PageData::new(PageOffset(0));
267        let rip = pd.insert(row.min, &table, row.user_data.clone());
268        assert!(rip.is_ok());
269
270        let ip = rip.unwrap();
271
272        row.item_pointer = get_item_pointer(1);
273
274        assert!(pd.update(row.clone(), ip.count).is_ok());
275
276        pin_mut!(pd);
277        let result_rows: Vec<RowData> = pd.get_stream().collect().await;
278        assert_eq!(row, result_rows[0]);
279    }
280}