use bytes::Bytes;
use crate::basic::{Encoding, PageType};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::thrift::{
DataPageHeader, DataPageHeaderV2, DictionaryPageHeader, PageHeader,
};
use crate::file::statistics::{Statistics, page_stats_to_thrift};
#[derive(Clone, Debug)]
pub enum Page {
DataPage {
buf: Bytes,
num_values: u32,
encoding: Encoding,
def_level_encoding: Encoding,
rep_level_encoding: Encoding,
statistics: Option<Statistics>,
},
DataPageV2 {
buf: Bytes,
num_values: u32,
encoding: Encoding,
num_nulls: u32,
num_rows: u32,
def_levels_byte_len: u32,
rep_levels_byte_len: u32,
is_compressed: bool,
statistics: Option<Statistics>,
},
DictionaryPage {
buf: Bytes,
num_values: u32,
encoding: Encoding,
is_sorted: bool,
},
}
impl Page {
pub fn page_type(&self) -> PageType {
match self {
Page::DataPage { .. } => PageType::DATA_PAGE,
Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
}
}
pub fn is_data_page(&self) -> bool {
matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. })
}
pub fn is_dictionary_page(&self) -> bool {
matches!(self, Page::DictionaryPage { .. })
}
pub fn buffer(&self) -> &Bytes {
match self {
Page::DataPage { buf, .. } => buf,
Page::DataPageV2 { buf, .. } => buf,
Page::DictionaryPage { buf, .. } => buf,
}
}
pub fn num_values(&self) -> u32 {
match self {
Page::DataPage { num_values, .. } => *num_values,
Page::DataPageV2 { num_values, .. } => *num_values,
Page::DictionaryPage { num_values, .. } => *num_values,
}
}
pub fn encoding(&self) -> Encoding {
match self {
Page::DataPage { encoding, .. } => *encoding,
Page::DataPageV2 { encoding, .. } => *encoding,
Page::DictionaryPage { encoding, .. } => *encoding,
}
}
pub fn statistics(&self) -> Option<&Statistics> {
match self {
Page::DataPage { statistics, .. } => statistics.as_ref(),
Page::DataPageV2 { statistics, .. } => statistics.as_ref(),
Page::DictionaryPage { .. } => None,
}
}
}
pub struct CompressedPage {
compressed_page: Page,
uncompressed_size: usize,
}
impl CompressedPage {
pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
Self {
compressed_page,
uncompressed_size,
}
}
pub fn page_type(&self) -> PageType {
self.compressed_page.page_type()
}
pub fn compressed_page(&self) -> &Page {
&self.compressed_page
}
pub fn uncompressed_size(&self) -> usize {
self.uncompressed_size
}
pub fn compressed_size(&self) -> usize {
self.compressed_page.buffer().len()
}
pub fn num_values(&self) -> u32 {
self.compressed_page.num_values()
}
pub fn encoding(&self) -> Encoding {
self.compressed_page.encoding()
}
pub fn data(&self) -> &[u8] {
self.compressed_page.buffer()
}
pub(crate) fn to_thrift_header(&self) -> Result<PageHeader> {
let uncompressed_size = self.uncompressed_size();
let compressed_size = self.compressed_size();
if uncompressed_size > i32::MAX as usize {
return Err(general_err!(
"Page uncompressed size overflow: {}",
uncompressed_size
));
}
if compressed_size > i32::MAX as usize {
return Err(general_err!(
"Page compressed size overflow: {}",
compressed_size
));
}
let num_values = self.num_values();
let encoding = self.encoding();
let page_type = self.page_type();
let mut page_header = PageHeader {
r#type: page_type,
uncompressed_page_size: uncompressed_size as i32,
compressed_page_size: compressed_size as i32,
crc: None,
data_page_header: None,
index_page_header: None,
dictionary_page_header: None,
data_page_header_v2: None,
};
match self.compressed_page {
Page::DataPage {
def_level_encoding,
rep_level_encoding,
ref statistics,
..
} => {
let data_page_header = DataPageHeader {
num_values: num_values as i32,
encoding,
definition_level_encoding: def_level_encoding,
repetition_level_encoding: rep_level_encoding,
statistics: page_stats_to_thrift(statistics.as_ref()),
};
page_header.data_page_header = Some(data_page_header);
}
Page::DataPageV2 {
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed,
ref statistics,
..
} => {
let data_page_header_v2 = DataPageHeaderV2 {
num_values: num_values as i32,
num_nulls: num_nulls as i32,
num_rows: num_rows as i32,
encoding,
definition_levels_byte_length: def_levels_byte_len as i32,
repetition_levels_byte_length: rep_levels_byte_len as i32,
is_compressed: Some(is_compressed),
statistics: page_stats_to_thrift(statistics.as_ref()),
};
page_header.data_page_header_v2 = Some(data_page_header_v2);
}
Page::DictionaryPage { is_sorted, .. } => {
let dictionary_page_header = DictionaryPageHeader {
num_values: num_values as i32,
encoding,
is_sorted: Some(is_sorted),
};
page_header.dictionary_page_header = Some(dictionary_page_header);
}
}
Ok(page_header)
}
#[cfg(feature = "encryption")]
pub(crate) fn with_new_compressed_buffer(mut self, new_buffer: Bytes) -> Self {
match &mut self.compressed_page {
Page::DataPage { buf, .. } => {
*buf = new_buffer;
}
Page::DataPageV2 { buf, .. } => {
*buf = new_buffer;
}
Page::DictionaryPage { buf, .. } => {
*buf = new_buffer;
}
}
self
}
}
pub struct PageWriteSpec {
pub page_type: PageType,
pub uncompressed_size: usize,
pub compressed_size: usize,
pub num_values: u32,
pub offset: u64,
pub bytes_written: u64,
}
impl Default for PageWriteSpec {
fn default() -> Self {
Self::new()
}
}
impl PageWriteSpec {
pub fn new() -> Self {
Self {
page_type: PageType::DATA_PAGE,
uncompressed_size: 0,
compressed_size: 0,
num_values: 0,
offset: 0,
bytes_written: 0,
}
}
}
#[derive(Clone)]
pub struct PageMetadata {
pub num_rows: Option<usize>,
pub num_levels: Option<usize>,
pub is_dict: bool,
}
impl TryFrom<&crate::file::metadata::thrift::PageHeader> for PageMetadata {
type Error = ParquetError;
fn try_from(
value: &crate::file::metadata::thrift::PageHeader,
) -> std::result::Result<Self, Self::Error> {
match value.r#type {
PageType::DATA_PAGE => {
let header = value.data_page_header.as_ref().unwrap();
Ok(PageMetadata {
num_rows: None,
num_levels: Some(header.num_values as _),
is_dict: false,
})
}
PageType::DICTIONARY_PAGE => Ok(PageMetadata {
num_rows: None,
num_levels: None,
is_dict: true,
}),
PageType::DATA_PAGE_V2 => {
let header = value.data_page_header_v2.as_ref().unwrap();
Ok(PageMetadata {
num_rows: Some(header.num_rows as _),
num_levels: Some(header.num_values as _),
is_dict: false,
})
}
other => Err(ParquetError::General(format!(
"page type {other:?} cannot be converted to PageMetadata"
))),
}
}
}
pub trait PageReader: Iterator<Item = Result<Page>> + Send {
fn get_next_page(&mut self) -> Result<Option<Page>>;
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
fn skip_next_page(&mut self) -> Result<()>;
fn at_record_boundary(&mut self) -> Result<bool> {
match self.peek_next_page()? {
None => Ok(true),
Some(metadata) => Ok(metadata.num_rows.is_some()),
}
}
}
pub trait PageWriter: Send {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
fn close(&mut self) -> Result<()>;
}
pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_page() {
let data_page = Page::DataPage {
buf: Bytes::from(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
};
assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice());
assert_eq!(data_page.num_values(), 10);
assert_eq!(data_page.encoding(), Encoding::PLAIN);
assert_eq!(
data_page.statistics(),
Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
);
let data_page_v2 = Page::DataPageV2 {
buf: Bytes::from(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
num_nulls: 5,
num_rows: 20,
def_levels_byte_len: 30,
rep_levels_byte_len: 40,
is_compressed: false,
statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
};
assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice());
assert_eq!(data_page_v2.num_values(), 10);
assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
assert_eq!(
data_page_v2.statistics(),
Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
);
let dict_page = Page::DictionaryPage {
buf: Bytes::from(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
is_sorted: false,
};
assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice());
assert_eq!(dict_page.num_values(), 10);
assert_eq!(dict_page.encoding(), Encoding::PLAIN);
assert_eq!(dict_page.statistics(), None);
}
#[test]
fn test_compressed_page() {
let data_page = Page::DataPage {
buf: Bytes::from(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
};
let cpage = CompressedPage::new(data_page, 5);
assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
assert_eq!(cpage.uncompressed_size(), 5);
assert_eq!(cpage.compressed_size(), 3);
assert_eq!(cpage.num_values(), 10);
assert_eq!(cpage.encoding(), Encoding::PLAIN);
assert_eq!(cpage.data(), &[0, 1, 2]);
}
#[test]
fn test_compressed_page_uncompressed_size_overflow() {
let data_page = Page::DataPage {
buf: Bytes::from(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: None,
};
let uncompressed_size = (i32::MAX as usize) + 1;
let cpage = CompressedPage::new(data_page, uncompressed_size);
let result = cpage.to_thrift_header();
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("Page uncompressed size overflow"));
}
}