parquet2/write/indexes/
serialize.rs

1use parquet_format_safe::BoundaryOrder;
2use parquet_format_safe::ColumnIndex;
3use parquet_format_safe::OffsetIndex;
4use parquet_format_safe::PageLocation;
5
6use crate::error::{Error, Result};
7pub use crate::metadata::KeyValue;
8use crate::statistics::serialize_statistics;
9
10use crate::write::page::{is_data_page, PageWriteSpec};
11
12pub fn serialize_column_index(pages: &[PageWriteSpec]) -> Result<ColumnIndex> {
13    let mut null_pages = Vec::with_capacity(pages.len());
14    let mut min_values = Vec::with_capacity(pages.len());
15    let mut max_values = Vec::with_capacity(pages.len());
16    let mut null_counts = Vec::with_capacity(pages.len());
17
18    pages
19        .iter()
20        .filter(|x| is_data_page(x))
21        .try_for_each(|spec| {
22            if let Some(stats) = &spec.statistics {
23                let stats = serialize_statistics(stats.as_ref());
24
25                let null_count = stats
26                    .null_count
27                    .ok_or_else(|| Error::oos("null count of a page is required"))?;
28                null_counts.push(null_count);
29
30                if let Some(min_value) = stats.min_value {
31                    min_values.push(min_value);
32                    max_values.push(
33                        stats
34                            .max_value
35                            .ok_or_else(|| Error::oos("max value of a page is required"))?,
36                    );
37                    null_pages.push(false)
38                } else {
39                    min_values.push(vec![0]);
40                    max_values.push(vec![0]);
41                    null_pages.push(true)
42                }
43
44                Result::Ok(())
45            } else {
46                Err(Error::oos(
47                    "options were set to write statistics but some pages miss them",
48                ))
49            }
50        })?;
51    Ok(ColumnIndex {
52        null_pages,
53        min_values,
54        max_values,
55        boundary_order: BoundaryOrder::UNORDERED,
56        null_counts: Some(null_counts),
57    })
58}
59
60pub fn serialize_offset_index(pages: &[PageWriteSpec]) -> Result<OffsetIndex> {
61    let mut first_row_index = 0;
62    let page_locations = pages
63        .iter()
64        .filter(|x| is_data_page(x))
65        .map(|spec| {
66            let location = PageLocation {
67                offset: spec.offset.try_into()?,
68                compressed_page_size: spec.bytes_written.try_into()?,
69                first_row_index,
70            };
71            let num_rows = spec.num_rows.ok_or_else(|| {
72                Error::oos(
73                    "options were set to write statistics but some data pages miss number of rows",
74                )
75            })?;
76            first_row_index += num_rows as i64;
77            Ok(location)
78        })
79        .collect::<Result<Vec<_>>>()?;
80
81    Ok(OffsetIndex { page_locations })
82}