use anyhow::{Context, Result};
use bytes::Bytes;
use micromegas_tracing::prelude::*;
use sqlx::{PgPool, Row};
use std::sync::Arc;
use super::metadata_cache::MetadataCache;
use crate::arrow_utils::parse_parquet_metadata;
use crate::lakehouse::metadata_compat;
use datafusion::parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
#[allow(deprecated)]
fn strip_column_index_info(metadata: ParquetMetaData) -> Result<ParquetMetaData> {
use datafusion::parquet::file::metadata::ParquetMetaDataWriter;
use parquet::format::FileMetaData as ThriftFileMetaData;
use parquet::thrift::TSerializable;
use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol};
let mut buffer = Vec::new();
let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
writer.finish()?;
let metadata_len = u32::from_le_bytes([
buffer[buffer.len() - 8],
buffer[buffer.len() - 7],
buffer[buffer.len() - 6],
buffer[buffer.len() - 5],
]) as usize;
let file_metadata_start = buffer.len() - 8 - metadata_len;
let file_metadata_bytes = &buffer[file_metadata_start..buffer.len() - 8];
let mut transport =
thrift::transport::TBufferChannel::with_capacity(file_metadata_bytes.len(), 0);
transport.set_readable_bytes(file_metadata_bytes);
let mut protocol = TCompactInputProtocol::new(transport);
let mut thrift_meta = ThriftFileMetaData::read_from_in_protocol(&mut protocol)
.context("parsing thrift metadata to strip column index")?;
for rg in thrift_meta.row_groups.iter_mut() {
for col in rg.columns.iter_mut() {
col.column_index_offset = None;
col.column_index_length = None;
col.offset_index_offset = None;
col.offset_index_length = None;
}
}
let mut modified_bytes: Vec<u8> = Vec::with_capacity(file_metadata_bytes.len() * 2);
let mut out_protocol = TCompactOutputProtocol::new(&mut modified_bytes);
thrift_meta
.write_to_out_protocol(&mut out_protocol)
.context("serializing modified thrift metadata")?;
out_protocol.flush()?;
ParquetMetaDataReader::decode_metadata(&Bytes::copy_from_slice(&modified_bytes))
.context("re-parsing metadata after stripping column index")
}
#[span_fn]
pub async fn load_partition_metadata(
pool: &PgPool,
file_path: &str,
cache: Option<&MetadataCache>,
) -> Result<Arc<ParquetMetaData>> {
if let Some(cache) = cache
&& let Some(metadata) = cache.get(file_path).await
{
debug!("cache hit for partition metadata path={file_path}");
return Ok(metadata);
}
let row = sqlx::query(
"SELECT metadata, partition_format_version
FROM partition_metadata
WHERE file_path = $1",
)
.bind(file_path)
.fetch_one(pool)
.await
.with_context(|| format!("loading metadata for file: {}", file_path))?;
let metadata_bytes: Vec<u8> = row.try_get("metadata")?;
let partition_format_version: i32 = row.try_get("partition_format_version")?;
let serialized_size = metadata_bytes.len() as u32;
debug!("fetched partition metadata path={file_path} size={serialized_size}");
let metadata = match partition_format_version {
1 => {
let num_rows_row =
sqlx::query("SELECT num_rows FROM lakehouse_partitions WHERE file_path = $1")
.bind(file_path)
.fetch_one(pool)
.await
.with_context(|| format!("loading num_rows for v1 partition: {}", file_path))?;
let num_rows: i64 = num_rows_row.try_get("num_rows")?;
metadata_compat::parse_legacy_and_upgrade(&metadata_bytes, num_rows)
.with_context(|| format!("parsing v1 metadata for file: {}", file_path))?
}
2 => {
parse_parquet_metadata(&metadata_bytes.into())
.with_context(|| format!("parsing v2 metadata for file: {}", file_path))?
}
_ => {
return Err(anyhow::anyhow!(
"unsupported partition_format_version {} for file: {}",
partition_format_version,
file_path
));
}
};
let stripped = strip_column_index_info(metadata)
.with_context(|| format!("stripping column index for file: {}", file_path))?;
let result = Arc::new(stripped);
if let Some(cache) = cache {
cache
.insert(file_path.to_string(), result.clone(), serialized_size)
.await;
}
Ok(result)
}
#[span_fn]
pub async fn delete_partition_metadata_batch(
tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
file_paths: &[String],
) -> Result<()> {
if file_paths.is_empty() {
return Ok(());
}
let result = sqlx::query("DELETE FROM partition_metadata WHERE file_path = ANY($1)")
.bind(file_paths)
.execute(&mut **tr)
.await
.with_context(|| format!("deleting {} metadata entries", file_paths.len()))?;
debug!("deleted {} metadata entries", result.rows_affected());
Ok(())
}