#[cfg(feature = "encryption")]
use crate::encryption::{
encrypt::{
encrypt_object, encrypt_object_to_vec, write_signed_plaintext_object, FileEncryptor,
},
modules::{create_footer_aad, create_module_aad, ModuleType},
};
#[cfg(feature = "encryption")]
use crate::errors::ParquetError;
use crate::errors::Result;
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::page_index::index::Index;
use crate::file::writer::{get_file_magic, TrackedWrite};
use crate::format::EncryptionAlgorithm;
#[cfg(feature = "encryption")]
use crate::format::{AesGcmV1, ColumnCryptoMetaData};
use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex, RowGroup};
use crate::schema::types;
use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
use crate::thrift::TSerializable;
use std::io::Write;
use std::sync::Arc;
use thrift::protocol::TCompactOutputProtocol;
pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
buf: &'a mut TrackedWrite<W>,
schema: &'a TypePtr,
schema_descr: &'a SchemaDescPtr,
row_groups: Vec<RowGroup>,
column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
key_value_metadata: Option<Vec<KeyValue>>,
created_by: Option<String>,
object_writer: MetadataObjectWriter,
writer_version: i32,
}
impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
fn write_offset_indexes(&mut self, offset_indexes: &[Vec<Option<OffsetIndex>>]) -> Result<()> {
for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
let start_offset = self.buf.bytes_written();
self.object_writer.write_offset_index(
offset_index,
column_metadata,
row_group_idx,
column_idx,
&mut self.buf,
)?;
let end_offset = self.buf.bytes_written();
column_metadata.offset_index_offset = Some(start_offset as i64);
column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
}
}
}
Ok(())
}
fn write_column_indexes(&mut self, column_indexes: &[Vec<Option<ColumnIndex>>]) -> Result<()> {
for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
let start_offset = self.buf.bytes_written();
self.object_writer.write_column_index(
column_index,
column_metadata,
row_group_idx,
column_idx,
&mut self.buf,
)?;
let end_offset = self.buf.bytes_written();
column_metadata.column_index_offset = Some(start_offset as i64);
column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
}
}
}
Ok(())
}
pub fn finish(mut self) -> Result<crate::format::FileMetaData> {
let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
if let Some(column_indexes) = self.column_indexes {
self.write_column_indexes(column_indexes)?;
}
if let Some(offset_indexes) = self.offset_indexes {
self.write_offset_indexes(offset_indexes)?;
}
let column_orders = (0..self.schema_descr.num_columns())
.map(|_| crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {}))
.collect();
let column_orders = Some(column_orders);
let (row_groups, unencrypted_row_groups) = self
.object_writer
.apply_row_group_encryption(self.row_groups)?;
let (encryption_algorithm, footer_signing_key_metadata) =
self.object_writer.get_plaintext_footer_crypto_metadata();
let mut file_metadata = FileMetaData {
num_rows,
row_groups,
key_value_metadata: self.key_value_metadata.clone(),
version: self.writer_version,
schema: types::to_thrift(self.schema.as_ref())?,
created_by: self.created_by.clone(),
column_orders,
encryption_algorithm,
footer_signing_key_metadata,
};
let start_pos = self.buf.bytes_written();
self.object_writer
.write_file_metadata(&file_metadata, &mut self.buf)?;
let end_pos = self.buf.bytes_written();
let metadata_len = (end_pos - start_pos) as u32;
self.buf.write_all(&metadata_len.to_le_bytes())?;
self.buf.write_all(self.object_writer.get_file_magic())?;
if let Some(row_groups) = unencrypted_row_groups {
file_metadata.row_groups = row_groups;
}
Ok(file_metadata)
}
pub fn new(
buf: &'a mut TrackedWrite<W>,
schema: &'a TypePtr,
schema_descr: &'a SchemaDescPtr,
row_groups: Vec<RowGroup>,
created_by: Option<String>,
writer_version: i32,
) -> Self {
Self {
buf,
schema,
schema_descr,
row_groups,
column_indexes: None,
offset_indexes: None,
key_value_metadata: None,
created_by,
object_writer: Default::default(),
writer_version,
}
}
pub fn with_column_indexes(mut self, column_indexes: &'a [Vec<Option<ColumnIndex>>]) -> Self {
self.column_indexes = Some(column_indexes);
self
}
pub fn with_offset_indexes(mut self, offset_indexes: &'a [Vec<Option<OffsetIndex>>]) -> Self {
self.offset_indexes = Some(offset_indexes);
self
}
pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
self.key_value_metadata = Some(key_value_metadata);
self
}
#[cfg(feature = "encryption")]
pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
self
}
}
pub struct ParquetMetaDataWriter<'a, W: Write> {
buf: TrackedWrite<W>,
metadata: &'a ParquetMetaData,
}
impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
Self::new_with_tracked(TrackedWrite::new(buf), metadata)
}
pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
Self { buf, metadata }
}
pub fn finish(mut self) -> Result<()> {
let file_metadata = self.metadata.file_metadata();
let schema = Arc::new(file_metadata.schema().clone());
let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
let created_by = file_metadata.created_by().map(str::to_string);
let row_groups = self
.metadata
.row_groups()
.iter()
.map(|rg| rg.to_thrift())
.collect::<Vec<_>>();
let key_value_metadata = file_metadata.key_value_metadata().cloned();
let column_indexes = self.convert_column_indexes();
let offset_indexes = self.convert_offset_index();
let mut encoder = ThriftMetadataWriter::new(
&mut self.buf,
&schema,
&schema_descr,
row_groups,
created_by,
file_metadata.version(),
);
encoder = encoder.with_column_indexes(&column_indexes);
encoder = encoder.with_offset_indexes(&offset_indexes);
if let Some(key_value_metadata) = key_value_metadata {
encoder = encoder.with_key_value_metadata(key_value_metadata);
}
encoder.finish()?;
Ok(())
}
fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
if let Some(row_group_column_indexes) = self.metadata.column_index() {
(0..self.metadata.row_groups().len())
.map(|rg_idx| {
let column_indexes = &row_group_column_indexes[rg_idx];
column_indexes
.iter()
.map(|column_index| match column_index {
Index::NONE => None,
Index::BOOLEAN(column_index) => Some(column_index.to_thrift()),
Index::BYTE_ARRAY(column_index) => Some(column_index.to_thrift()),
Index::DOUBLE(column_index) => Some(column_index.to_thrift()),
Index::FIXED_LEN_BYTE_ARRAY(column_index) => {
Some(column_index.to_thrift())
}
Index::FLOAT(column_index) => Some(column_index.to_thrift()),
Index::INT32(column_index) => Some(column_index.to_thrift()),
Index::INT64(column_index) => Some(column_index.to_thrift()),
Index::INT96(column_index) => Some(column_index.to_thrift()),
})
.collect()
})
.collect()
} else {
self.metadata
.row_groups()
.iter()
.map(|rg| std::iter::repeat_n(None, rg.columns().len()).collect())
.collect()
}
}
fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
(0..self.metadata.row_groups().len())
.map(|rg_idx| {
let offset_indexes = &row_group_offset_indexes[rg_idx];
offset_indexes
.iter()
.map(|offset_index| Some(offset_index.to_thrift()))
.collect()
})
.collect()
} else {
self.metadata
.row_groups()
.iter()
.map(|rg| std::iter::repeat_n(None, rg.columns().len()).collect())
.collect()
}
}
}
#[derive(Debug, Default)]
struct MetadataObjectWriter {
#[cfg(feature = "encryption")]
file_encryptor: Option<Arc<FileEncryptor>>,
}
impl MetadataObjectWriter {
#[inline]
fn write_object(object: &impl TSerializable, sink: impl Write) -> Result<()> {
let mut protocol = TCompactOutputProtocol::new(sink);
object.write_to_out_protocol(&mut protocol)?;
Ok(())
}
}
#[cfg(not(feature = "encryption"))]
impl MetadataObjectWriter {
fn write_file_metadata(&self, file_metadata: &FileMetaData, sink: impl Write) -> Result<()> {
Self::write_object(file_metadata, sink)
}
fn write_offset_index(
&self,
offset_index: &OffsetIndex,
_column_chunk: &ColumnChunk,
_row_group_idx: usize,
_column_idx: usize,
sink: impl Write,
) -> Result<()> {
Self::write_object(offset_index, sink)
}
fn write_column_index(
&self,
column_index: &ColumnIndex,
_column_chunk: &ColumnChunk,
_row_group_idx: usize,
_column_idx: usize,
sink: impl Write,
) -> Result<()> {
Self::write_object(column_index, sink)
}
fn apply_row_group_encryption(
&self,
row_groups: Vec<RowGroup>,
) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
Ok((row_groups, None))
}
pub fn get_file_magic(&self) -> &[u8; 4] {
get_file_magic()
}
fn get_plaintext_footer_crypto_metadata(
&self,
) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
(None, None)
}
}
#[cfg(feature = "encryption")]
impl MetadataObjectWriter {
fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
self.file_encryptor = encryptor;
self
}
fn write_file_metadata(
&self,
file_metadata: &FileMetaData,
mut sink: impl Write,
) -> Result<()> {
match self.file_encryptor.as_ref() {
Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
let mut protocol = TCompactOutputProtocol::new(&mut sink);
crypto_metadata.write_to_out_protocol(&mut protocol)?;
let aad = create_footer_aad(file_encryptor.file_aad())?;
let mut encryptor = file_encryptor.get_footer_encryptor()?;
encrypt_object(file_metadata, &mut encryptor, &mut sink, &aad)
}
Some(file_encryptor) if file_metadata.encryption_algorithm.is_some() => {
let aad = create_footer_aad(file_encryptor.file_aad())?;
let mut encryptor = file_encryptor.get_footer_encryptor()?;
write_signed_plaintext_object(file_metadata, &mut encryptor, &mut sink, &aad)
}
_ => Self::write_object(file_metadata, &mut sink),
}
}
fn write_offset_index(
&self,
offset_index: &OffsetIndex,
column_chunk: &ColumnChunk,
row_group_idx: usize,
column_idx: usize,
sink: impl Write,
) -> Result<()> {
match &self.file_encryptor {
Some(file_encryptor) => Self::write_object_with_encryption(
offset_index,
sink,
file_encryptor,
column_chunk,
ModuleType::OffsetIndex,
row_group_idx,
column_idx,
),
None => Self::write_object(offset_index, sink),
}
}
fn write_column_index(
&self,
column_index: &ColumnIndex,
column_chunk: &ColumnChunk,
row_group_idx: usize,
column_idx: usize,
sink: impl Write,
) -> Result<()> {
match &self.file_encryptor {
Some(file_encryptor) => Self::write_object_with_encryption(
column_index,
sink,
file_encryptor,
column_chunk,
ModuleType::ColumnIndex,
row_group_idx,
column_idx,
),
None => Self::write_object(column_index, sink),
}
}
fn apply_row_group_encryption(
&self,
row_groups: Vec<RowGroup>,
) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
match &self.file_encryptor {
Some(file_encryptor) => {
let unencrypted_row_groups = row_groups.clone();
let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
}
None => Ok((row_groups, None)),
}
}
fn get_file_magic(&self) -> &[u8; 4] {
get_file_magic(
self.file_encryptor
.as_ref()
.map(|encryptor| encryptor.properties()),
)
}
fn write_object_with_encryption(
object: &impl TSerializable,
mut sink: impl Write,
file_encryptor: &FileEncryptor,
column_metadata: &ColumnChunk,
module_type: ModuleType,
row_group_index: usize,
column_index: usize,
) -> Result<()> {
let column_path_vec = &column_metadata
.meta_data
.as_ref()
.ok_or_else(|| {
general_err!(
"Column metadata not set for column {} when encrypting object",
column_index
)
})?
.path_in_schema;
let joined_column_path;
let column_path = if column_path_vec.len() == 1 {
&column_path_vec[0]
} else {
joined_column_path = column_path_vec.join(".");
&joined_column_path
};
if file_encryptor.is_column_encrypted(column_path) {
let aad = create_module_aad(
file_encryptor.file_aad(),
module_type,
row_group_index,
column_index,
None,
)?;
let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
encrypt_object(object, &mut encryptor, &mut sink, &aad)
} else {
Self::write_object(object, sink)
}
}
fn get_plaintext_footer_crypto_metadata(
&self,
) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
if let Some(file_encryptor) = self.file_encryptor.as_ref() {
let encryption_properties = file_encryptor.properties();
if !encryption_properties.encrypt_footer() {
return (
Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
encryption_properties.footer_key_metadata().cloned(),
);
}
}
(None, None)
}
fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
let supply_aad_prefix = file_encryptor
.properties()
.aad_prefix()
.map(|_| !file_encryptor.properties().store_aad_prefix());
let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
file_encryptor.properties().aad_prefix().cloned()
} else {
None
};
EncryptionAlgorithm::AESGCMV1(AesGcmV1 {
aad_prefix,
aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
supply_aad_prefix,
})
}
fn file_crypto_metadata(
file_encryptor: &FileEncryptor,
) -> Result<crate::format::FileCryptoMetaData> {
let properties = file_encryptor.properties();
Ok(crate::format::FileCryptoMetaData {
encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
key_metadata: properties.footer_key_metadata().cloned(),
})
}
fn encrypt_row_groups(
row_groups: Vec<RowGroup>,
file_encryptor: &Arc<FileEncryptor>,
) -> Result<Vec<RowGroup>> {
row_groups
.into_iter()
.enumerate()
.map(|(rg_idx, mut rg)| {
let cols: Result<Vec<ColumnChunk>> = rg
.columns
.into_iter()
.enumerate()
.map(|(col_idx, c)| {
Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
})
.collect();
rg.columns = cols?;
Ok(rg)
})
.collect()
}
fn encrypt_column_chunk(
mut column_chunk: ColumnChunk,
file_encryptor: &Arc<FileEncryptor>,
row_group_index: usize,
column_index: usize,
) -> Result<ColumnChunk> {
match column_chunk.crypto_metadata.as_ref() {
None => {}
Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
}
Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(col_key)) => {
let column_path = col_key.path_in_schema.join(".");
let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
let meta_data = column_chunk
.meta_data
.take()
.ok_or_else(|| general_err!("Column metadata not set for encryption"))?;
let aad = create_module_aad(
file_encryptor.file_aad(),
ModuleType::ColumnMetaData,
row_group_index,
column_index,
None,
)?;
let ciphertext = encrypt_object_to_vec(&meta_data, &mut column_encryptor, &aad)?;
column_chunk.encrypted_column_metadata = Some(ciphertext);
debug_assert!(column_chunk.meta_data.is_none());
}
}
Ok(column_chunk)
}
}