parquet2/write/
page.rs

1use std::convert::TryInto;
2use std::io::Write;
3use std::sync::Arc;
4
5#[cfg(feature = "async")]
6use futures::{AsyncWrite, AsyncWriteExt};
7#[cfg(feature = "async")]
8use parquet_format_safe::thrift::protocol::TCompactOutputStreamProtocol;
9
10use parquet_format_safe::thrift::protocol::TCompactOutputProtocol;
11use parquet_format_safe::{DictionaryPageHeader, Encoding, PageType};
12
13use crate::compression::Compression;
14use crate::error::{Error, Result};
15use crate::page::{
16    CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, ParquetPageHeader,
17};
18use crate::statistics::Statistics;
19
20pub(crate) fn is_data_page(page: &PageWriteSpec) -> bool {
21    page.header.type_ == PageType::DATA_PAGE || page.header.type_ == PageType::DATA_PAGE_V2
22}
23
24fn maybe_bytes(uncompressed: usize, compressed: usize) -> Result<(i32, i32)> {
25    let uncompressed_page_size: i32 = uncompressed.try_into().map_err(|_| {
26        Error::oos(format!(
27            "A page can only contain i32::MAX uncompressed bytes. This one contains {}",
28            uncompressed
29        ))
30    })?;
31
32    let compressed_page_size: i32 = compressed.try_into().map_err(|_| {
33        Error::oos(format!(
34            "A page can only contain i32::MAX compressed bytes. This one contains {}",
35            compressed
36        ))
37    })?;
38
39    Ok((uncompressed_page_size, compressed_page_size))
40}
41
42/// Contains page write metrics.
43pub struct PageWriteSpec {
44    pub header: ParquetPageHeader,
45    pub num_values: usize,
46    pub num_rows: Option<usize>,
47    pub header_size: u64,
48    pub offset: u64,
49    pub bytes_written: u64,
50    pub compression: Compression,
51    pub statistics: Option<Arc<dyn Statistics>>,
52}
53
54pub fn write_page<W: Write>(
55    writer: &mut W,
56    offset: u64,
57    compressed_page: &CompressedPage,
58) -> Result<PageWriteSpec> {
59    let num_values = compressed_page.num_values();
60    let selected_rows = compressed_page.selected_rows();
61
62    let header = match &compressed_page {
63        CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
64        CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
65    }?;
66
67    let header_size = write_page_header(writer, &header)?;
68    let mut bytes_written = header_size;
69
70    bytes_written += match &compressed_page {
71        CompressedPage::Data(compressed_page) => {
72            writer.write_all(&compressed_page.buffer)?;
73            compressed_page.buffer.len() as u64
74        }
75        CompressedPage::Dict(compressed_page) => {
76            writer.write_all(&compressed_page.buffer)?;
77            compressed_page.buffer.len() as u64
78        }
79    };
80
81    let statistics = match &compressed_page {
82        CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,
83        CompressedPage::Dict(_) => None,
84    };
85
86    Ok(PageWriteSpec {
87        header,
88        header_size,
89        offset,
90        bytes_written,
91        compression: compressed_page.compression(),
92        statistics,
93        num_rows: selected_rows.map(|x| x.last().unwrap().length),
94        num_values,
95    })
96}
97
98#[cfg(feature = "async")]
99#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
100pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(
101    writer: &mut W,
102    offset: u64,
103    compressed_page: &CompressedPage,
104) -> Result<PageWriteSpec> {
105    let num_values = compressed_page.num_values();
106    let selected_rows = compressed_page.selected_rows();
107
108    let header = match &compressed_page {
109        CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
110        CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
111    }?;
112
113    let header_size = write_page_header_async(writer, &header).await?;
114    let mut bytes_written = header_size as u64;
115
116    bytes_written += match &compressed_page {
117        CompressedPage::Data(compressed_page) => {
118            writer.write_all(&compressed_page.buffer).await?;
119            compressed_page.buffer.len() as u64
120        }
121        CompressedPage::Dict(compressed_page) => {
122            writer.write_all(&compressed_page.buffer).await?;
123            compressed_page.buffer.len() as u64
124        }
125    };
126
127    let statistics = match &compressed_page {
128        CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,
129        CompressedPage::Dict(_) => None,
130    };
131
132    Ok(PageWriteSpec {
133        header,
134        header_size,
135        offset,
136        bytes_written,
137        compression: compressed_page.compression(),
138        statistics,
139        num_rows: selected_rows.map(|x| x.last().unwrap().length),
140        num_values,
141    })
142}
143
144fn assemble_data_page_header(page: &CompressedDataPage) -> Result<ParquetPageHeader> {
145    let (uncompressed_page_size, compressed_page_size) =
146        maybe_bytes(page.uncompressed_size(), page.compressed_size())?;
147
148    let mut page_header = ParquetPageHeader {
149        type_: match page.header() {
150            DataPageHeader::V1(_) => PageType::DATA_PAGE,
151            DataPageHeader::V2(_) => PageType::DATA_PAGE_V2,
152        },
153        uncompressed_page_size,
154        compressed_page_size,
155        crc: None,
156        data_page_header: None,
157        index_page_header: None,
158        dictionary_page_header: None,
159        data_page_header_v2: None,
160    };
161
162    match page.header() {
163        DataPageHeader::V1(header) => {
164            page_header.data_page_header = Some(header.clone());
165        }
166        DataPageHeader::V2(header) => {
167            page_header.data_page_header_v2 = Some(header.clone());
168        }
169    }
170    Ok(page_header)
171}
172
173fn assemble_dict_page_header(page: &CompressedDictPage) -> Result<ParquetPageHeader> {
174    let (uncompressed_page_size, compressed_page_size) =
175        maybe_bytes(page.uncompressed_page_size, page.buffer.len())?;
176
177    let num_values: i32 = page.num_values.try_into().map_err(|_| {
178        Error::oos(format!(
179            "A dictionary page can only contain i32::MAX items. This one contains {}",
180            page.num_values
181        ))
182    })?;
183
184    Ok(ParquetPageHeader {
185        type_: PageType::DICTIONARY_PAGE,
186        uncompressed_page_size,
187        compressed_page_size,
188        crc: None,
189        data_page_header: None,
190        index_page_header: None,
191        dictionary_page_header: Some(DictionaryPageHeader {
192            num_values,
193            encoding: Encoding::PLAIN,
194            is_sorted: None,
195        }),
196        data_page_header_v2: None,
197    })
198}
199
200/// writes the page header into `writer`, returning the number of bytes used in the process.
201fn write_page_header<W: Write>(mut writer: &mut W, header: &ParquetPageHeader) -> Result<u64> {
202    let mut protocol = TCompactOutputProtocol::new(&mut writer);
203    Ok(header.write_to_out_protocol(&mut protocol)? as u64)
204}
205
206#[cfg(feature = "async")]
207#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
208/// writes the page header into `writer`, returning the number of bytes used in the process.
209async fn write_page_header_async<W: AsyncWrite + Unpin + Send>(
210    mut writer: &mut W,
211    header: &ParquetPageHeader,
212) -> Result<u64> {
213    let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);
214    Ok(header.write_to_out_stream_protocol(&mut protocol).await? as u64)
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn dict_too_large() {
223        let page = CompressedDictPage::new(
224            vec![],
225            Compression::Uncompressed,
226            i32::MAX as usize + 1,
227            100,
228            false,
229        );
230        assert!(assemble_dict_page_header(&page).is_err());
231    }
232
233    #[test]
234    fn dict_too_many_values() {
235        let page = CompressedDictPage::new(
236            vec![],
237            Compression::Uncompressed,
238            0,
239            i32::MAX as usize + 1,
240            false,
241        );
242        assert!(assemble_dict_page_header(&page).is_err());
243    }
244}