feophantlib/engine/io/row_formats/
row_data.rs

1//! Encodes / decodes a row into a byte array based on the supplied specification
2//! Format from here: https://www.postgresql.org/docs/current/storage-page-layout.html
3//! As always I'm only implementing what I need and will extend once I need more
4use super::super::super::objects::Table;
5use super::super::super::transactions::TransactionId;
6use super::null_mask::NullMaskError;
7use super::{InfoMask, ItemPointer, ItemPointerError, NullMask};
8use crate::engine::io::format_traits::{Parseable, Serializable};
9use crate::engine::io::{ConstEncodedSize, EncodedSize, SelfEncodedSize};
10use crate::engine::objects::types::{BaseSqlTypes, BaseSqlTypesError, SqlTypeDefinition};
11use crate::engine::objects::SqlTuple;
12use bytes::{Buf, BufMut};
13use std::fmt;
14use std::mem::size_of;
15use std::sync::Arc;
16use thiserror::Error;
17
18/// Holds information about a particular row in a table as well as metadata.
19#[derive(Clone, Debug, PartialEq)]
20pub struct RowData {
21    ///Type that defines the row
22    pub sql_type: Arc<SqlTypeDefinition>,
23    ///Lowest transaction this row is valid for (still need to check that transaction's status)
24    pub min: TransactionId,
25    ///Max transaction this row is valid for OR None for still valid (still need to check max's status)
26    pub max: Option<TransactionId>,
27    ///Page + Offset where this row is stored on disk
28    pub item_pointer: ItemPointer,
29    ///Columns stored in this row
30    pub user_data: SqlTuple,
31}
32
33impl RowData {
34    pub fn new(
35        sql_type: Arc<SqlTypeDefinition>,
36        min: TransactionId,
37        max: Option<TransactionId>,
38        item_pointer: ItemPointer,
39        user_data: SqlTuple,
40    ) -> RowData {
41        RowData {
42            sql_type,
43            min,
44            max,
45            item_pointer,
46            user_data,
47        }
48    }
49
50    pub fn get_column(&self, name: &str) -> Result<Option<BaseSqlTypes>, RowDataError> {
51        for i in 0..self.sql_type.len() {
52            if self.sql_type[i].0 == *name {
53                return Ok(self.user_data.0[i].clone());
54            }
55        }
56
57        Err(RowDataError::ColumnDoesNotExist(name.to_string()))
58    }
59
60    pub fn get_column_not_null(&self, name: &str) -> Result<BaseSqlTypes, RowDataError> {
61        self.get_column(name)?
62            .ok_or_else(|| RowDataError::UnexpectedNull(name.to_string()))
63    }
64
65    pub fn parse(table: Arc<Table>, row_buffer: &mut impl Buf) -> Result<RowData, RowDataError> {
66        if row_buffer.remaining() < TransactionId::encoded_size() {
67            return Err(RowDataError::MissingMinData(
68                size_of::<TransactionId>(),
69                row_buffer.remaining(),
70            ));
71        }
72        let min = TransactionId::new(row_buffer.get_u64_le());
73
74        if row_buffer.remaining() < TransactionId::encoded_size() {
75            return Err(RowDataError::MissingMaxData(
76                size_of::<TransactionId>(),
77                row_buffer.remaining(),
78            ));
79        }
80        let max_temp = row_buffer.get_u64_le();
81        let max = match max_temp {
82            0 => None,
83            _ => Some(TransactionId::new(max_temp)),
84        };
85
86        let item_pointer = ItemPointer::parse(row_buffer)?;
87
88        let null_mask = RowData::get_null_mask(table.clone(), row_buffer)?;
89
90        let mut user_data = SqlTuple(vec![]);
91        for (column, mask) in table.attributes.iter().zip(null_mask.iter()) {
92            if *mask {
93                user_data.0.push(None);
94            } else {
95                user_data.0.push(Some(BaseSqlTypes::deserialize(
96                    &column.sql_type,
97                    row_buffer,
98                )?));
99            }
100        }
101
102        Ok(RowData::new(
103            table.sql_type.clone(),
104            min,
105            max,
106            item_pointer,
107            user_data,
108        ))
109    }
110
111    //Gets the null mask, if it doesn't exist it will return a vector of all not nulls
112    fn get_null_mask(
113        table: Arc<Table>,
114        row_buffer: &mut impl Buf,
115    ) -> Result<Vec<bool>, RowDataError> {
116        if row_buffer.remaining() < size_of::<InfoMask>() {
117            return Err(RowDataError::MissingInfoMaskData(
118                size_of::<TransactionId>(),
119                row_buffer.remaining(),
120            ));
121        }
122
123        let mask = InfoMask::from_bits_truncate(row_buffer.get_u8()); //Ignoring unused bits
124        if !mask.contains(InfoMask::HAS_NULL) {
125            return Ok(vec![false; table.attributes.len()]);
126        }
127
128        let columns_rounded = (table.attributes.len() + 7) / 8; //From https://users.rust-lang.org/t/solved-rust-round-usize-to-nearest-multiple-of-8/25549
129        if row_buffer.remaining() < columns_rounded {
130            return Err(RowDataError::MissingNullMaskData(
131                columns_rounded,
132                row_buffer.remaining(),
133            ));
134        }
135
136        let mut null_mask_raw = row_buffer.copy_to_bytes(columns_rounded);
137        Ok(NullMask::parse(&mut null_mask_raw, table.attributes.len())?)
138    }
139}
140
141impl fmt::Display for RowData {
142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143        writeln!(f, "RowData")?;
144        writeln!(f, "\tType: {}", self.sql_type)?;
145        writeln!(f, "\tMin Tran: {}", self.min)?;
146        match self.max {
147            Some(m) => writeln!(f, "\tMax Tran: {}", m),
148            None => writeln!(f, "\tMax Tran: Unset"),
149        }?;
150        writeln!(f, "\t{}", self.item_pointer)?;
151        for column in &self.user_data.0 {
152            match column {
153                Some(c) => writeln!(f, "\t{}", c),
154                None => writeln!(f, "\tNull"),
155            }?;
156        }
157        Ok(())
158    }
159}
160
161impl EncodedSize<&SqlTuple> for RowData {
162    fn encoded_size(input: &SqlTuple) -> usize {
163        let mut size = size_of::<u64>()
164            + size_of::<u64>()
165            + ItemPointer::encoded_size()
166            + InfoMask::encoded_size()
167            + input.encoded_size();
168
169        if input.iter().any(|i| i.is_none()) {
170            size += NullMask::encoded_size(input);
171        }
172
173        size
174    }
175}
176
177impl Serializable for RowData {
178    fn serialize(&self, buffer: &mut impl BufMut) {
179        buffer.put_u64_le(self.min.get_u64());
180        buffer.put_u64_le(self.max.unwrap_or_else(|| TransactionId::new(0)).get_u64());
181        self.item_pointer.serialize(buffer);
182
183        //If there is null we add it to the flags and write a nullmask
184        let mut mask = InfoMask::empty();
185        if self.user_data.iter().any(|x| x.is_none()) {
186            mask = InfoMask::HAS_NULL;
187            buffer.put_u8(mask.bits());
188            buffer.put(NullMask::serialize(&self.user_data));
189        } else {
190            buffer.put_u8(mask.bits());
191        }
192
193        self.user_data.serialize(buffer);
194    }
195}
196
197#[derive(Debug, Error)]
198pub enum RowDataError {
199    #[error(transparent)]
200    BaseSqlTypes(#[from] BaseSqlTypesError),
201    #[error("Table definition length {0} does not match columns passed {1}")]
202    TableRowSizeMismatch(usize, usize),
203    #[error("Not enough min data need {0} got {1}")]
204    MissingMinData(usize, usize),
205    #[error("Not enough max data need {0} got {1}")]
206    MissingMaxData(usize, usize),
207    #[error("Not enough infomask data need {0} got {1}")]
208    MissingInfoMaskData(usize, usize),
209    #[error("Not enough null mask data need {0} got {1}")]
210    MissingNullMaskData(usize, usize),
211    #[error(transparent)]
212    NullMaskError(#[from] NullMaskError),
213    #[error(transparent)]
214    ItemPointerError(#[from] ItemPointerError),
215    #[error("Column named {0} does not exist")]
216    ColumnDoesNotExist(String),
217    #[error("Column null when ask not to be {0}")]
218    UnexpectedNull(String),
219}
220
221#[cfg(test)]
222mod tests {
223    use bytes::BytesMut;
224
225    use crate::constants::Nullable;
226    use crate::engine::io::page_formats::PageOffset;
227    use crate::engine::objects::types::BaseSqlTypesMapper;
228
229    use super::super::super::super::objects::Attribute;
230    use super::super::super::page_formats::UInt12;
231    use super::*;
232
233    fn get_item_pointer() -> ItemPointer {
234        ItemPointer::new(PageOffset(0), UInt12::new(0).unwrap())
235    }
236
237    #[test]
238    fn test_row_data_single_text() -> Result<(), Box<dyn std::error::Error>> {
239        let table = Arc::new(Table::new(
240            uuid::Uuid::new_v4(),
241            "test_table".to_string(),
242            vec![Attribute::new(
243                "header".to_string(),
244                BaseSqlTypesMapper::Text,
245                Nullable::NotNull,
246                None,
247            )],
248            vec![],
249            vec![],
250        ));
251
252        let test = RowData::new(
253            table.sql_type.clone(),
254            TransactionId::new(1),
255            None,
256            get_item_pointer(),
257            SqlTuple(vec![Some(BaseSqlTypes::Text("this is a test".to_string()))]),
258        );
259
260        let mut buffer = BytesMut::new();
261        test.serialize(&mut buffer);
262        let mut buffer = buffer.freeze();
263
264        let test_parse = RowData::parse(table, &mut buffer)?;
265        assert_eq!(test, test_parse);
266
267        Ok(())
268    }
269
270    #[test]
271    fn test_row_data_double_text() -> Result<(), Box<dyn std::error::Error>> {
272        let table = Arc::new(Table::new(
273            uuid::Uuid::new_v4(),
274            "test_table".to_string(),
275            vec![
276                Attribute::new(
277                    "header".to_string(),
278                    BaseSqlTypesMapper::Text,
279                    Nullable::NotNull,
280                    None,
281                ),
282                Attribute::new(
283                    "header2".to_string(),
284                    BaseSqlTypesMapper::Text,
285                    Nullable::NotNull,
286                    None,
287                ),
288            ],
289            vec![],
290            vec![],
291        ));
292
293        let test = RowData::new(
294            table.sql_type.clone(),
295            TransactionId::new(1),
296            None,
297            get_item_pointer(),
298            SqlTuple(vec![
299                Some(BaseSqlTypes::Text("this is a test".to_string())),
300                Some(BaseSqlTypes::Text("this is not a test".to_string())),
301            ]),
302        );
303
304        let mut buffer = BytesMut::new();
305        test.serialize(&mut buffer);
306        let mut buffer = buffer.freeze();
307
308        let test_parse = RowData::parse(table, &mut buffer)?;
309        assert_eq!(test, test_parse);
310
311        Ok(())
312    }
313
314    #[test]
315    fn test_row_uuid_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
316        let table = Arc::new(Table::new(
317            uuid::Uuid::new_v4(),
318            "test_table".to_string(),
319            vec![Attribute::new(
320                "header".to_string(),
321                BaseSqlTypesMapper::Uuid,
322                Nullable::NotNull,
323                None,
324            )],
325            vec![],
326            vec![],
327        ));
328
329        let test = RowData::new(
330            table.sql_type.clone(),
331            TransactionId::new(1),
332            None,
333            get_item_pointer(),
334            SqlTuple(vec![Some(BaseSqlTypes::Uuid(uuid::Uuid::new_v4()))]),
335        );
336
337        let mut buffer = BytesMut::new();
338        test.serialize(&mut buffer);
339        let mut buffer = buffer.freeze();
340
341        let test_parse = RowData::parse(table, &mut buffer)?;
342        assert_eq!(test, test_parse);
343
344        Ok(())
345    }
346
347    #[test]
348    fn test_row_uuid_double_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
349        let table = Arc::new(Table::new(
350            uuid::Uuid::new_v4(),
351            "test_table".to_string(),
352            vec![
353                Attribute::new(
354                    "header".to_string(),
355                    BaseSqlTypesMapper::Uuid,
356                    Nullable::NotNull,
357                    None,
358                ),
359                Attribute::new(
360                    "header2".to_string(),
361                    BaseSqlTypesMapper::Uuid,
362                    Nullable::NotNull,
363                    None,
364                ),
365            ],
366            vec![],
367            vec![],
368        ));
369
370        let test = RowData::new(
371            table.sql_type.clone(),
372            TransactionId::new(1),
373            None,
374            get_item_pointer(),
375            SqlTuple(vec![
376                Some(BaseSqlTypes::Uuid(uuid::Uuid::new_v4())),
377                Some(BaseSqlTypes::Uuid(uuid::Uuid::new_v4())),
378            ]),
379        );
380
381        let mut buffer = BytesMut::new();
382        test.serialize(&mut buffer);
383        let mut buffer = buffer.freeze();
384
385        let test_parse = RowData::parse(table, &mut buffer)?;
386        assert_eq!(test, test_parse);
387
388        Ok(())
389    }
390
391    #[test]
392    fn test_row_uuid_double_opt_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
393        let table = Arc::new(Table::new(
394            uuid::Uuid::new_v4(),
395            "test_table".to_string(),
396            vec![
397                Attribute::new(
398                    "header".to_string(),
399                    BaseSqlTypesMapper::Uuid,
400                    Nullable::NotNull,
401                    None,
402                ),
403                Attribute::new(
404                    "header2".to_string(),
405                    BaseSqlTypesMapper::Uuid,
406                    Nullable::Null,
407                    None,
408                ),
409            ],
410            vec![],
411            vec![],
412        ));
413
414        let test = RowData::new(
415            table.sql_type.clone(),
416            TransactionId::new(1),
417            None,
418            get_item_pointer(),
419            SqlTuple(vec![Some(BaseSqlTypes::Uuid(uuid::Uuid::new_v4())), None]),
420        );
421
422        let mut buffer = BytesMut::new();
423        test.serialize(&mut buffer);
424        let mut buffer = buffer.freeze();
425
426        println!("{:?}", buffer.len());
427        let test_parse = RowData::parse(table, &mut buffer)?;
428        assert_eq!(test, test_parse);
429
430        Ok(())
431    }
432
433    #[test]
434    fn test_row_complex_data_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
435        let table = Arc::new(Table::new(
436            uuid::Uuid::new_v4(),
437            "test_table".to_string(),
438            vec![
439                Attribute::new(
440                    "header".to_string(),
441                    BaseSqlTypesMapper::Text,
442                    Nullable::NotNull,
443                    None,
444                ),
445                Attribute::new(
446                    "id".to_string(),
447                    BaseSqlTypesMapper::Uuid,
448                    Nullable::Null,
449                    None,
450                ),
451                Attribute::new(
452                    "header3".to_string(),
453                    BaseSqlTypesMapper::Array(Arc::new(BaseSqlTypesMapper::Integer)),
454                    Nullable::NotNull,
455                    None,
456                ),
457            ],
458            vec![],
459            vec![],
460        ));
461
462        let test = RowData::new(
463            table.sql_type.clone(),
464            TransactionId::new(1),
465            None,
466            get_item_pointer(),
467            SqlTuple(vec![
468                Some(BaseSqlTypes::Text("this is a test".to_string())),
469                None,
470                Some(BaseSqlTypes::Array(vec![
471                    BaseSqlTypes::Integer(1),
472                    BaseSqlTypes::Integer(2),
473                ])),
474            ]),
475        );
476
477        let mut buffer = BytesMut::new();
478        test.serialize(&mut buffer);
479        let mut buffer = buffer.freeze();
480
481        let test_parse = RowData::parse(table, &mut buffer)?;
482        assert_eq!(test, test_parse);
483
484        let column_val = test_parse.get_column_not_null("header")?;
485        assert_eq!(column_val, BaseSqlTypes::Text("this is a test".to_string()));
486
487        Ok(())
488    }
489
490    #[test]
491    fn test_encoded_size() {
492        let tuple = SqlTuple(vec![Some(BaseSqlTypes::Uuid(uuid::Uuid::new_v4())), None]);
493        match size_of::<usize>() {
494            4 => assert_eq!(40, RowData::encoded_size(&tuple)), //Not 100% certain if correct
495            8 => assert_eq!(44, RowData::encoded_size(&tuple)),
496            _ => panic!("You're on your own on this arch."),
497        }
498    }
499}