use anyhow::{Context, Result};
use bytes::Bytes;
use datafusion::parquet::file::metadata::ParquetMetaDataReader;
use datafusion::{
arrow::{
array::{ListBuilder, StructBuilder, as_struct_array},
record_batch::RecordBatch,
},
parquet::file::metadata::ParquetMetaData,
};
use micromegas_tracing::prelude::*;
pub fn make_empty_record_batch() -> RecordBatch {
let mut list_builder = ListBuilder::new(StructBuilder::from_fields([], 0));
let array = list_builder.finish();
as_struct_array(array.values()).into()
}
#[span_fn]
pub fn parse_parquet_metadata(bytes: &Bytes) -> Result<ParquetMetaData> {
ParquetMetaDataReader::decode_metadata(bytes).with_context(|| "parsing ParquetMetaData")
}
pub fn serialize_parquet_metadata(pmd: &ParquetMetaData) -> Result<bytes::Bytes> {
use datafusion::parquet::file::metadata::ParquetMetaDataWriter;
let mut buffer = Vec::new();
let md_writer = ParquetMetaDataWriter::new(&mut buffer, pmd);
md_writer
.finish()
.with_context(|| "serializing parquet metadata")?;
let serialized = bytes::Bytes::from(buffer);
const FOOTER_SIZE: usize = 8; const LENGTH_SIZE: usize = 4;
if serialized.len() < FOOTER_SIZE {
anyhow::bail!("Serialized metadata too small: {} bytes", serialized.len());
}
let length_offset = serialized.len() - FOOTER_SIZE;
let footer_len_bytes = &serialized[length_offset..length_offset + LENGTH_SIZE];
let metadata_len = u32::from_le_bytes(
footer_len_bytes
.try_into()
.with_context(|| "reading footer length")?,
) as usize;
let footer_start = serialized
.len()
.checked_sub(FOOTER_SIZE + metadata_len)
.with_context(|| {
format!(
"Invalid footer length: {} (total size: {})",
metadata_len,
serialized.len()
)
})?;
let file_metadata_bytes = serialized.slice(footer_start..length_offset);
Ok(file_metadata_bytes)
}