1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use crate::{error::Error, metadata::get_sort_order};
use super::{column_order::ColumnOrder, schema_descriptor::SchemaDescriptor, RowGroupMetaData};
use parquet_format_async_temp::ColumnOrder as TColumnOrder;
pub use parquet_format_async_temp::KeyValue;
#[derive(Debug, Clone)]
pub struct FileMetaData {
pub version: i32,
pub num_rows: usize,
pub created_by: Option<String>,
pub row_groups: Vec<RowGroupMetaData>,
pub key_value_metadata: Option<Vec<KeyValue>>,
pub schema_descr: SchemaDescriptor,
pub column_orders: Option<Vec<ColumnOrder>>,
}
impl FileMetaData {
pub fn schema(&self) -> &SchemaDescriptor {
&self.schema_descr
}
pub fn key_value_metadata(&self) -> &Option<Vec<KeyValue>> {
&self.key_value_metadata
}
pub fn column_order(&self, i: usize) -> ColumnOrder {
self.column_orders
.as_ref()
.map(|data| data[i])
.unwrap_or(ColumnOrder::Undefined)
}
pub fn try_from_thrift(
metadata: parquet_format_async_temp::FileMetaData,
) -> Result<Self, Error> {
let schema_descr = SchemaDescriptor::try_from_thrift(&metadata.schema)?;
let row_groups = metadata
.row_groups
.into_iter()
.map(|rg| RowGroupMetaData::try_from_thrift(&schema_descr, rg))
.collect::<Result<Vec<_>, Error>>()?;
let column_orders = metadata
.column_orders
.map(|orders| parse_column_orders(&orders, &schema_descr));
Ok(FileMetaData {
version: metadata.version,
num_rows: metadata.num_rows.try_into()?,
created_by: metadata.created_by,
row_groups,
key_value_metadata: metadata.key_value_metadata,
schema_descr,
column_orders,
})
}
pub fn into_thrift(self) -> parquet_format_async_temp::FileMetaData {
parquet_format_async_temp::FileMetaData {
version: self.version,
schema: self.schema_descr.into_thrift(),
num_rows: self.num_rows as i64,
row_groups: self
.row_groups
.into_iter()
.map(|v| v.into_thrift())
.collect(),
key_value_metadata: self.key_value_metadata,
created_by: self.created_by,
column_orders: None,
encryption_algorithm: None,
footer_signing_key_metadata: None,
}
}
}
fn parse_column_orders(
orders: &[TColumnOrder],
schema_descr: &SchemaDescriptor,
) -> Vec<ColumnOrder> {
schema_descr
.columns()
.iter()
.zip(orders.iter())
.map(|(column, order)| match order {
TColumnOrder::TYPEORDER(_) => {
let sort_order = get_sort_order(
&column.descriptor.primitive_type.logical_type,
&column.descriptor.primitive_type.converted_type,
&column.descriptor.primitive_type.physical_type,
);
ColumnOrder::TypeDefinedOrder(sort_order)
}
})
.collect()
}