use crate::file::metadata::thrift::FileMeta;
use crate::file::metadata::{
ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData,
};
use crate::schema::types::{SchemaDescPtr, SchemaDescriptor};
use crate::{
basic::ColumnOrder,
file::metadata::{FileMetaData, ParquetMetaDataBuilder},
};
#[cfg(feature = "encryption")]
use crate::{
encryption::{
encrypt::{FileEncryptor, encrypt_thrift_object, write_signed_plaintext_thrift_object},
modules::{ModuleType, create_footer_aad, create_module_aad},
},
file::column_crypto_metadata::ColumnCryptoMetaData,
file::metadata::thrift::encryption::{AesGcmV1, EncryptionAlgorithm, FileCryptoMetaData},
};
use crate::{errors::Result, file::page_index::column_index::ColumnIndexMetaData};
use crate::{
file::writer::{TrackedWrite, get_file_magic},
parquet_thrift::WriteThrift,
};
use crate::{
file::{
metadata::{KeyValue, ParquetMetaData},
page_index::offset_index::OffsetIndexMetaData,
},
parquet_thrift::ThriftCompactOutputProtocol,
};
use std::io::Write;
use std::sync::Arc;
pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
buf: &'a mut TrackedWrite<W>,
schema_descr: &'a SchemaDescPtr,
row_groups: Vec<RowGroupMetaData>,
column_indexes: Option<Vec<Vec<Option<ColumnIndexMetaData>>>>,
offset_indexes: Option<Vec<Vec<Option<OffsetIndexMetaData>>>>,
key_value_metadata: Option<Vec<KeyValue>>,
created_by: Option<String>,
object_writer: MetadataObjectWriter,
writer_version: i32,
}
impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
fn write_offset_indexes(
&mut self,
offset_indexes: &[Vec<Option<OffsetIndexMetaData>>],
) -> Result<()> {
for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
let start_offset = self.buf.bytes_written();
self.object_writer.write_offset_index(
offset_index,
column_metadata,
row_group_idx,
column_idx,
&mut self.buf,
)?;
let end_offset = self.buf.bytes_written();
column_metadata.offset_index_offset = Some(start_offset as i64);
column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
}
}
}
Ok(())
}
fn write_column_indexes(
&mut self,
column_indexes: &[Vec<Option<ColumnIndexMetaData>>],
) -> Result<()> {
for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
let start_offset = self.buf.bytes_written();
if self.object_writer.write_column_index(
column_index,
column_metadata,
row_group_idx,
column_idx,
&mut self.buf,
)? {
let end_offset = self.buf.bytes_written();
column_metadata.column_index_offset = Some(start_offset as i64);
column_metadata.column_index_length =
Some((end_offset - start_offset) as i32);
}
}
}
}
Ok(())
}
fn finalize_column_indexes(&mut self) -> Result<Option<ParquetColumnIndex>> {
let column_indexes = std::mem::take(&mut self.column_indexes);
if let Some(column_indexes) = column_indexes.as_ref() {
self.write_column_indexes(column_indexes)?;
}
let all_none = column_indexes
.as_ref()
.is_some_and(|ci| ci.iter().all(|cii| cii.iter().all(|idx| idx.is_none())));
let column_indexes: Option<ParquetColumnIndex> = if all_none {
None
} else {
column_indexes.map(|ovvi| {
ovvi.into_iter()
.map(|vi| {
vi.into_iter()
.map(|ci| ci.unwrap_or(ColumnIndexMetaData::NONE))
.collect()
})
.collect()
})
};
Ok(column_indexes)
}
fn finalize_offset_indexes(&mut self) -> Result<Option<ParquetOffsetIndex>> {
let offset_indexes = std::mem::take(&mut self.offset_indexes);
if let Some(offset_indexes) = offset_indexes.as_ref() {
self.write_offset_indexes(offset_indexes)?;
}
let all_none = offset_indexes
.as_ref()
.is_some_and(|oi| oi.iter().all(|oii| oii.iter().all(|idx| idx.is_none())));
let offset_indexes: Option<ParquetOffsetIndex> = if all_none {
None
} else {
offset_indexes.map(|ovvi| {
ovvi.into_iter()
.map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
.collect()
})
};
Ok(offset_indexes)
}
pub fn finish(mut self) -> Result<ParquetMetaData> {
let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
let column_indexes = self.finalize_column_indexes()?;
let offset_indexes = self.finalize_offset_indexes()?;
let column_orders = self
.schema_descr
.columns()
.iter()
.map(|col| {
let sort_order = ColumnOrder::sort_order_for_type(
col.logical_type_ref(),
col.converted_type(),
col.physical_type(),
);
ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
})
.collect();
let column_orders = Some(column_orders);
let (row_groups, unencrypted_row_groups) = self
.object_writer
.apply_row_group_encryption(self.row_groups)?;
#[cfg(feature = "encryption")]
let (encryption_algorithm, footer_signing_key_metadata) =
self.object_writer.get_plaintext_footer_crypto_metadata();
#[cfg(feature = "encryption")]
let file_metadata = FileMetaData::new(
self.writer_version,
num_rows,
self.created_by,
self.key_value_metadata,
self.schema_descr.clone(),
column_orders,
)
.with_encryption_algorithm(encryption_algorithm)
.with_footer_signing_key_metadata(footer_signing_key_metadata);
#[cfg(not(feature = "encryption"))]
let file_metadata = FileMetaData::new(
self.writer_version,
num_rows,
self.created_by,
self.key_value_metadata,
self.schema_descr.clone(),
column_orders,
);
let file_meta = FileMeta {
file_metadata: &file_metadata,
row_groups: &row_groups,
};
let start_pos = self.buf.bytes_written();
self.object_writer
.write_file_metadata(&file_meta, &mut self.buf)?;
let end_pos = self.buf.bytes_written();
let metadata_len = (end_pos - start_pos) as u32;
self.buf.write_all(&metadata_len.to_le_bytes())?;
self.buf.write_all(self.object_writer.get_file_magic())?;
let builder = ParquetMetaDataBuilder::new(file_metadata)
.set_column_index(column_indexes)
.set_offset_index(offset_indexes);
Ok(match unencrypted_row_groups {
Some(rg) => builder.set_row_groups(rg).build(),
None => builder.set_row_groups(row_groups).build(),
})
}
pub fn new(
buf: &'a mut TrackedWrite<W>,
schema_descr: &'a SchemaDescPtr,
row_groups: Vec<RowGroupMetaData>,
created_by: Option<String>,
writer_version: i32,
) -> Self {
Self {
buf,
schema_descr,
row_groups,
column_indexes: None,
offset_indexes: None,
key_value_metadata: None,
created_by,
object_writer: Default::default(),
writer_version,
}
}
pub fn with_column_indexes(
mut self,
column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
) -> Self {
self.column_indexes = Some(column_indexes);
self
}
pub fn with_offset_indexes(
mut self,
offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
) -> Self {
self.offset_indexes = Some(offset_indexes);
self
}
pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
self.key_value_metadata = Some(key_value_metadata);
self
}
#[cfg(feature = "encryption")]
pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
self
}
}
pub struct ParquetMetaDataWriter<'a, W: Write> {
buf: TrackedWrite<W>,
metadata: &'a ParquetMetaData,
}
impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
Self::new_with_tracked(TrackedWrite::new(buf), metadata)
}
pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
Self { buf, metadata }
}
pub fn finish(mut self) -> Result<()> {
let file_metadata = self.metadata.file_metadata();
let schema = Arc::new(file_metadata.schema().clone());
let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
let created_by = file_metadata.created_by().map(str::to_string);
let row_groups = self.metadata.row_groups.clone();
let key_value_metadata = file_metadata.key_value_metadata().cloned();
let column_indexes = self.convert_column_indexes();
let offset_indexes = self.convert_offset_index();
let mut encoder = ThriftMetadataWriter::new(
&mut self.buf,
&schema_descr,
row_groups,
created_by,
file_metadata.version(),
);
if let Some(column_indexes) = column_indexes {
encoder = encoder.with_column_indexes(column_indexes);
}
if let Some(offset_indexes) = offset_indexes {
encoder = encoder.with_offset_indexes(offset_indexes);
}
if let Some(key_value_metadata) = key_value_metadata {
encoder = encoder.with_key_value_metadata(key_value_metadata);
}
encoder.finish()?;
Ok(())
}
fn convert_column_indexes(&self) -> Option<Vec<Vec<Option<ColumnIndexMetaData>>>> {
self.metadata
.column_index()
.map(|row_group_column_indexes| {
(0..self.metadata.row_groups().len())
.map(|rg_idx| {
let column_indexes = &row_group_column_indexes[rg_idx];
column_indexes
.iter()
.map(|column_index| Some(column_index.clone()))
.collect()
})
.collect()
})
}
fn convert_offset_index(&self) -> Option<Vec<Vec<Option<OffsetIndexMetaData>>>> {
self.metadata
.offset_index()
.map(|row_group_offset_indexes| {
(0..self.metadata.row_groups().len())
.map(|rg_idx| {
let offset_indexes = &row_group_offset_indexes[rg_idx];
offset_indexes
.iter()
.map(|offset_index| Some(offset_index.clone()))
.collect()
})
.collect()
})
}
}
#[derive(Debug, Default)]
struct MetadataObjectWriter {
#[cfg(feature = "encryption")]
file_encryptor: Option<Arc<FileEncryptor>>,
}
impl MetadataObjectWriter {
#[inline]
fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) -> Result<()> {
let mut protocol = ThriftCompactOutputProtocol::new(sink);
object.write_thrift(&mut protocol)?;
Ok(())
}
}
#[cfg(not(feature = "encryption"))]
impl MetadataObjectWriter {
fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write) -> Result<()> {
Self::write_thrift_object(file_metadata, sink)
}
fn write_offset_index(
&self,
offset_index: &OffsetIndexMetaData,
_column_chunk: &ColumnChunkMetaData,
_row_group_idx: usize,
_column_idx: usize,
sink: impl Write,
) -> Result<()> {
Self::write_thrift_object(offset_index, sink)
}
fn write_column_index(
&self,
column_index: &ColumnIndexMetaData,
_column_chunk: &ColumnChunkMetaData,
_row_group_idx: usize,
_column_idx: usize,
sink: impl Write,
) -> Result<bool> {
match column_index {
ColumnIndexMetaData::NONE => Ok(false),
_ => {
Self::write_thrift_object(column_index, sink)?;
Ok(true)
}
}
}
fn apply_row_group_encryption(
&self,
row_groups: Vec<RowGroupMetaData>,
) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
Ok((row_groups, None))
}
pub fn get_file_magic(&self) -> &[u8; 4] {
get_file_magic()
}
}
#[cfg(feature = "encryption")]
impl MetadataObjectWriter {
fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
self.file_encryptor = encryptor;
self
}
fn write_file_metadata(&self, file_metadata: &FileMeta, mut sink: impl Write) -> Result<()> {
match self.file_encryptor.as_ref() {
Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
let mut protocol = ThriftCompactOutputProtocol::new(&mut sink);
crypto_metadata.write_thrift(&mut protocol)?;
let aad = create_footer_aad(file_encryptor.file_aad())?;
let mut encryptor = file_encryptor.get_footer_encryptor()?;
encrypt_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
}
Some(file_encryptor) if file_metadata.file_metadata.encryption_algorithm.is_some() => {
let aad = create_footer_aad(file_encryptor.file_aad())?;
let mut encryptor = file_encryptor.get_footer_encryptor()?;
write_signed_plaintext_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
}
_ => Self::write_thrift_object(file_metadata, &mut sink),
}
}
fn write_offset_index(
&self,
offset_index: &OffsetIndexMetaData,
column_chunk: &ColumnChunkMetaData,
row_group_idx: usize,
column_idx: usize,
sink: impl Write,
) -> Result<()> {
match &self.file_encryptor {
Some(file_encryptor) => Self::write_thrift_object_with_encryption(
offset_index,
sink,
file_encryptor,
column_chunk,
ModuleType::OffsetIndex,
row_group_idx,
column_idx,
),
None => Self::write_thrift_object(offset_index, sink),
}
}
fn write_column_index(
&self,
column_index: &ColumnIndexMetaData,
column_chunk: &ColumnChunkMetaData,
row_group_idx: usize,
column_idx: usize,
sink: impl Write,
) -> Result<bool> {
match column_index {
ColumnIndexMetaData::NONE => Ok(false),
_ => {
match &self.file_encryptor {
Some(file_encryptor) => Self::write_thrift_object_with_encryption(
column_index,
sink,
file_encryptor,
column_chunk,
ModuleType::ColumnIndex,
row_group_idx,
column_idx,
)?,
None => Self::write_thrift_object(column_index, sink)?,
}
Ok(true)
}
}
}
fn apply_row_group_encryption(
&self,
row_groups: Vec<RowGroupMetaData>,
) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
match &self.file_encryptor {
Some(file_encryptor) => {
let unencrypted_row_groups = row_groups.clone();
let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
}
None => Ok((row_groups, None)),
}
}
fn get_file_magic(&self) -> &[u8; 4] {
get_file_magic(
self.file_encryptor
.as_ref()
.map(|encryptor| encryptor.properties()),
)
}
fn write_thrift_object_with_encryption(
object: &impl WriteThrift,
mut sink: impl Write,
file_encryptor: &FileEncryptor,
column_metadata: &ColumnChunkMetaData,
module_type: ModuleType,
row_group_index: usize,
column_index: usize,
) -> Result<()> {
let column_path_vec = column_metadata.column_path().as_ref();
let joined_column_path;
let column_path = if column_path_vec.len() == 1 {
&column_path_vec[0]
} else {
joined_column_path = column_path_vec.join(".");
&joined_column_path
};
if file_encryptor.is_column_encrypted(column_path) {
use crate::encryption::encrypt::encrypt_thrift_object;
let aad = create_module_aad(
file_encryptor.file_aad(),
module_type,
row_group_index,
column_index,
None,
)?;
let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
encrypt_thrift_object(object, &mut encryptor, &mut sink, &aad)
} else {
Self::write_thrift_object(object, sink)
}
}
fn get_plaintext_footer_crypto_metadata(
&self,
) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
if let Some(file_encryptor) = self.file_encryptor.as_ref() {
let encryption_properties = file_encryptor.properties();
if !encryption_properties.encrypt_footer() {
return (
Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
encryption_properties.footer_key_metadata().cloned(),
);
}
}
(None, None)
}
fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
let supply_aad_prefix = file_encryptor
.properties()
.aad_prefix()
.map(|_| !file_encryptor.properties().store_aad_prefix());
let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
file_encryptor.properties().aad_prefix()
} else {
None
};
EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
aad_prefix: aad_prefix.cloned(),
aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
supply_aad_prefix,
})
}
fn file_crypto_metadata(file_encryptor: &'_ FileEncryptor) -> Result<FileCryptoMetaData<'_>> {
let properties = file_encryptor.properties();
Ok(FileCryptoMetaData {
encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
key_metadata: properties.footer_key_metadata().map(|v| v.as_slice()),
})
}
fn encrypt_row_groups(
row_groups: Vec<RowGroupMetaData>,
file_encryptor: &Arc<FileEncryptor>,
) -> Result<Vec<RowGroupMetaData>> {
row_groups
.into_iter()
.enumerate()
.map(|(rg_idx, mut rg)| {
let cols: Result<Vec<ColumnChunkMetaData>> = rg
.columns
.into_iter()
.enumerate()
.map(|(col_idx, c)| {
Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
})
.collect();
rg.columns = cols?;
Ok(rg)
})
.collect()
}
fn encrypt_column_chunk(
mut column_chunk: ColumnChunkMetaData,
file_encryptor: &Arc<FileEncryptor>,
row_group_index: usize,
column_index: usize,
) -> Result<ColumnChunkMetaData> {
let encryptor = match column_chunk.column_crypto_metadata.as_deref() {
None => None,
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
let is_footer_encrypted = file_encryptor.properties().encrypt_footer();
if !is_footer_encrypted {
Some(file_encryptor.get_footer_encryptor()?)
} else {
None
}
}
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => {
let column_path = col_key.path_in_schema.join(".");
Some(file_encryptor.get_column_encryptor(&column_path)?)
}
};
if let Some(mut encryptor) = encryptor {
use crate::file::metadata::thrift::serialize_column_meta_data;
let aad = create_module_aad(
file_encryptor.file_aad(),
ModuleType::ColumnMetaData,
row_group_index,
column_index,
None,
)?;
let mut buffer: Vec<u8> = vec![];
{
let mut prot = ThriftCompactOutputProtocol::new(&mut buffer);
serialize_column_meta_data(&column_chunk, &mut prot)?;
}
let ciphertext = encryptor.encrypt(&buffer, &aad)?;
column_chunk.encrypted_column_metadata = Some(ciphertext);
column_chunk.plaintext_footer_mode = !file_encryptor.properties().encrypt_footer();
}
Ok(column_chunk)
}
}