1use std::convert::TryInto;
2use std::io::Write;
3use std::sync::Arc;
4
5#[cfg(feature = "async")]
6use futures::{AsyncWrite, AsyncWriteExt};
7#[cfg(feature = "async")]
8use parquet_format_safe::thrift::protocol::TCompactOutputStreamProtocol;
9
10use parquet_format_safe::thrift::protocol::TCompactOutputProtocol;
11use parquet_format_safe::{DictionaryPageHeader, Encoding, PageType};
12
13use crate::compression::Compression;
14use crate::error::{Error, Result};
15use crate::page::{
16 CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, ParquetPageHeader,
17};
18use crate::statistics::Statistics;
19
20pub(crate) fn is_data_page(page: &PageWriteSpec) -> bool {
21 page.header.type_ == PageType::DATA_PAGE || page.header.type_ == PageType::DATA_PAGE_V2
22}
23
24fn maybe_bytes(uncompressed: usize, compressed: usize) -> Result<(i32, i32)> {
25 let uncompressed_page_size: i32 = uncompressed.try_into().map_err(|_| {
26 Error::oos(format!(
27 "A page can only contain i32::MAX uncompressed bytes. This one contains {}",
28 uncompressed
29 ))
30 })?;
31
32 let compressed_page_size: i32 = compressed.try_into().map_err(|_| {
33 Error::oos(format!(
34 "A page can only contain i32::MAX compressed bytes. This one contains {}",
35 compressed
36 ))
37 })?;
38
39 Ok((uncompressed_page_size, compressed_page_size))
40}
41
42pub struct PageWriteSpec {
44 pub header: ParquetPageHeader,
45 pub num_values: usize,
46 pub num_rows: Option<usize>,
47 pub header_size: u64,
48 pub offset: u64,
49 pub bytes_written: u64,
50 pub compression: Compression,
51 pub statistics: Option<Arc<dyn Statistics>>,
52}
53
54pub fn write_page<W: Write>(
55 writer: &mut W,
56 offset: u64,
57 compressed_page: &CompressedPage,
58) -> Result<PageWriteSpec> {
59 let num_values = compressed_page.num_values();
60 let selected_rows = compressed_page.selected_rows();
61
62 let header = match &compressed_page {
63 CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
64 CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
65 }?;
66
67 let header_size = write_page_header(writer, &header)?;
68 let mut bytes_written = header_size;
69
70 bytes_written += match &compressed_page {
71 CompressedPage::Data(compressed_page) => {
72 writer.write_all(&compressed_page.buffer)?;
73 compressed_page.buffer.len() as u64
74 }
75 CompressedPage::Dict(compressed_page) => {
76 writer.write_all(&compressed_page.buffer)?;
77 compressed_page.buffer.len() as u64
78 }
79 };
80
81 let statistics = match &compressed_page {
82 CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,
83 CompressedPage::Dict(_) => None,
84 };
85
86 Ok(PageWriteSpec {
87 header,
88 header_size,
89 offset,
90 bytes_written,
91 compression: compressed_page.compression(),
92 statistics,
93 num_rows: selected_rows.map(|x| x.last().unwrap().length),
94 num_values,
95 })
96}
97
98#[cfg(feature = "async")]
99#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
100pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(
101 writer: &mut W,
102 offset: u64,
103 compressed_page: &CompressedPage,
104) -> Result<PageWriteSpec> {
105 let num_values = compressed_page.num_values();
106 let selected_rows = compressed_page.selected_rows();
107
108 let header = match &compressed_page {
109 CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
110 CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
111 }?;
112
113 let header_size = write_page_header_async(writer, &header).await?;
114 let mut bytes_written = header_size as u64;
115
116 bytes_written += match &compressed_page {
117 CompressedPage::Data(compressed_page) => {
118 writer.write_all(&compressed_page.buffer).await?;
119 compressed_page.buffer.len() as u64
120 }
121 CompressedPage::Dict(compressed_page) => {
122 writer.write_all(&compressed_page.buffer).await?;
123 compressed_page.buffer.len() as u64
124 }
125 };
126
127 let statistics = match &compressed_page {
128 CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,
129 CompressedPage::Dict(_) => None,
130 };
131
132 Ok(PageWriteSpec {
133 header,
134 header_size,
135 offset,
136 bytes_written,
137 compression: compressed_page.compression(),
138 statistics,
139 num_rows: selected_rows.map(|x| x.last().unwrap().length),
140 num_values,
141 })
142}
143
144fn assemble_data_page_header(page: &CompressedDataPage) -> Result<ParquetPageHeader> {
145 let (uncompressed_page_size, compressed_page_size) =
146 maybe_bytes(page.uncompressed_size(), page.compressed_size())?;
147
148 let mut page_header = ParquetPageHeader {
149 type_: match page.header() {
150 DataPageHeader::V1(_) => PageType::DATA_PAGE,
151 DataPageHeader::V2(_) => PageType::DATA_PAGE_V2,
152 },
153 uncompressed_page_size,
154 compressed_page_size,
155 crc: None,
156 data_page_header: None,
157 index_page_header: None,
158 dictionary_page_header: None,
159 data_page_header_v2: None,
160 };
161
162 match page.header() {
163 DataPageHeader::V1(header) => {
164 page_header.data_page_header = Some(header.clone());
165 }
166 DataPageHeader::V2(header) => {
167 page_header.data_page_header_v2 = Some(header.clone());
168 }
169 }
170 Ok(page_header)
171}
172
173fn assemble_dict_page_header(page: &CompressedDictPage) -> Result<ParquetPageHeader> {
174 let (uncompressed_page_size, compressed_page_size) =
175 maybe_bytes(page.uncompressed_page_size, page.buffer.len())?;
176
177 let num_values: i32 = page.num_values.try_into().map_err(|_| {
178 Error::oos(format!(
179 "A dictionary page can only contain i32::MAX items. This one contains {}",
180 page.num_values
181 ))
182 })?;
183
184 Ok(ParquetPageHeader {
185 type_: PageType::DICTIONARY_PAGE,
186 uncompressed_page_size,
187 compressed_page_size,
188 crc: None,
189 data_page_header: None,
190 index_page_header: None,
191 dictionary_page_header: Some(DictionaryPageHeader {
192 num_values,
193 encoding: Encoding::PLAIN,
194 is_sorted: None,
195 }),
196 data_page_header_v2: None,
197 })
198}
199
200fn write_page_header<W: Write>(mut writer: &mut W, header: &ParquetPageHeader) -> Result<u64> {
202 let mut protocol = TCompactOutputProtocol::new(&mut writer);
203 Ok(header.write_to_out_protocol(&mut protocol)? as u64)
204}
205
206#[cfg(feature = "async")]
207#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
208async fn write_page_header_async<W: AsyncWrite + Unpin + Send>(
210 mut writer: &mut W,
211 header: &ParquetPageHeader,
212) -> Result<u64> {
213 let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);
214 Ok(header.write_to_out_stream_protocol(&mut protocol).await? as u64)
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn dict_too_large() {
223 let page = CompressedDictPage::new(
224 vec![],
225 Compression::Uncompressed,
226 i32::MAX as usize + 1,
227 100,
228 false,
229 );
230 assert!(assemble_dict_page_header(&page).is_err());
231 }
232
233 #[test]
234 fn dict_too_many_values() {
235 let page = CompressedDictPage::new(
236 vec![],
237 Compression::Uncompressed,
238 0,
239 i32::MAX as usize + 1,
240 false,
241 );
242 assert!(assemble_dict_page_header(&page).is_err());
243 }
244}