use std::{collections::HashMap, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use bytes::Bytes;
use parquet::{
arrow::ArrowWriter,
basic::Compression,
file::{
metadata::{
FileMetaData, KeyValue, ParquetMetaData, ParquetMetaDataBuilder, ParquetMetaDataReader,
ParquetMetaDataWriter,
},
properties::WriterProperties,
},
schema::types::ColumnPath,
};
use crate::superfile::{LazyByteSource, LazyByteSourceError, format::kv};
const PARQUET_MAGIC_LEN: usize = 4;
const PARQUET_FOOTER_LEN_FIELD_BYTES: usize = 4;
const PARQUET_FOOTER_SUFFIX_BYTES: usize = PARQUET_FOOTER_LEN_FIELD_BYTES + PARQUET_MAGIC_LEN;
const PARQUET_MIN_FILE_BYTES: usize = PARQUET_MAGIC_LEN + PARQUET_FOOTER_SUFFIX_BYTES;
pub struct ParquetParts {
pub bytes: Vec<u8>,
pub fts_offset: u64,
pub fts_length: u64,
pub vec_offset: u64,
pub vec_length: u64,
}
pub type KvMap = HashMap<String, String>;
#[derive(thiserror::Error, Debug)]
pub enum FooterError {
#[error("parquet error: {0}")]
Parquet(#[from] parquet::errors::ParquetError),
#[error("arrow error: {0}")]
Arrow(#[from] arrow::error::ArrowError),
#[error("malformed parquet: {0}")]
Malformed(&'static str),
#[error("lazy source: {0}")]
LazySource(String),
}
pub struct EncodedBody {
buf: Vec<u8>,
metadata: ParquetMetaData,
}
pub fn encode_parquet_body(
schema: &Arc<Schema>,
batches: &[RecordBatch],
compression: Compression,
row_group_size: usize,
column_page_size_limits: &[(&str, usize)],
) -> Result<EncodedBody, FooterError> {
let mut props_builder = WriterProperties::builder()
.set_compression(compression)
.set_max_row_group_row_count(Some(row_group_size));
for (col, limit) in column_page_size_limits {
props_builder = props_builder
.set_column_data_page_size_limit(ColumnPath::from((*col).to_string()), *limit);
}
let props = props_builder.build();
let mut buf: Vec<u8> = Vec::new();
{
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(props))?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
}
let n = buf.len();
if n < PARQUET_MIN_FILE_BYTES {
return Err(FooterError::Malformed("parquet buffer too short"));
}
if &buf[n - PARQUET_MAGIC_LEN..n] != b"PAR1" {
return Err(FooterError::Malformed("missing trailing PAR1 magic"));
}
let footer_len_bytes: [u8; PARQUET_FOOTER_LEN_FIELD_BYTES] = buf
[n - PARQUET_FOOTER_SUFFIX_BYTES..n - PARQUET_MAGIC_LEN]
.try_into()
.map_err(|_| FooterError::Malformed("footer length not 4 bytes"))?;
let footer_len = u32::from_le_bytes(footer_len_bytes) as usize;
if n < PARQUET_FOOTER_SUFFIX_BYTES + footer_len {
return Err(FooterError::Malformed("footer length out of range"));
}
let footer_start = n - PARQUET_FOOTER_SUFFIX_BYTES - footer_len;
let footer_bytes = buf[footer_start..n - PARQUET_FOOTER_SUFFIX_BYTES].to_vec();
let metadata = ParquetMetaDataReader::decode_metadata(&footer_bytes)?;
buf.truncate(footer_start);
Ok(EncodedBody { buf, metadata })
}
pub fn splice_index_blobs(
body: EncodedBody,
fts_blob: &[u8],
vec_blob: &[u8],
extra_kv: &[(String, String)],
) -> Result<ParquetParts, FooterError> {
let EncodedBody { mut buf, metadata } = body;
let fts_offset = if !fts_blob.is_empty() {
let off = buf.len() as u64;
buf.extend_from_slice(fts_blob);
off
} else {
0
};
let vec_offset = if !vec_blob.is_empty() {
let off = buf.len() as u64;
buf.extend_from_slice(vec_blob);
off
} else {
0
};
let old_fm = metadata.file_metadata();
let mut kvs = old_fm.key_value_metadata().cloned().unwrap_or_default();
for (k, v) in extra_kv {
kvs.push(KeyValue::new(k.clone(), Some(v.clone())));
}
if !fts_blob.is_empty() {
kvs.push(KeyValue::new(
kv::FTS_OFFSET.to_string(),
Some(fts_offset.to_string()),
));
kvs.push(KeyValue::new(
kv::FTS_LENGTH.to_string(),
Some((fts_blob.len() as u64).to_string()),
));
}
if !vec_blob.is_empty() {
kvs.push(KeyValue::new(
kv::VEC_OFFSET.to_string(),
Some(vec_offset.to_string()),
));
kvs.push(KeyValue::new(
kv::VEC_LENGTH.to_string(),
Some((vec_blob.len() as u64).to_string()),
));
}
let new_fm = FileMetaData::new(
old_fm.version(),
old_fm.num_rows(),
old_fm.created_by().map(String::from),
Some(kvs),
old_fm.schema_descr_ptr(),
old_fm.column_orders().cloned(),
);
let new_meta = ParquetMetaDataBuilder::new(new_fm)
.set_row_groups(metadata.row_groups().to_vec())
.set_column_index(metadata.column_index().cloned())
.set_offset_index(metadata.offset_index().cloned())
.build();
ParquetMetaDataWriter::new(&mut buf, &new_meta).finish()?;
Ok(ParquetParts {
bytes: buf,
fts_offset,
fts_length: fts_blob.len() as u64,
vec_offset,
vec_length: vec_blob.len() as u64,
})
}
pub fn read_kv_metadata(bytes: &[u8]) -> Result<KvMap, FooterError> {
let n = bytes.len();
if n < PARQUET_MIN_FILE_BYTES || &bytes[n - PARQUET_MAGIC_LEN..n] != b"PAR1" {
return Err(FooterError::Malformed("not a Parquet file (missing PAR1)"));
}
let footer_len_bytes: [u8; PARQUET_FOOTER_LEN_FIELD_BYTES] = bytes
[n - PARQUET_FOOTER_SUFFIX_BYTES..n - PARQUET_MAGIC_LEN]
.try_into()
.map_err(|_| FooterError::Malformed("footer length not 4 bytes"))?;
let footer_len = u32::from_le_bytes(footer_len_bytes) as usize;
if n < PARQUET_FOOTER_SUFFIX_BYTES + footer_len {
return Err(FooterError::Malformed("footer length out of range"));
}
let footer_start = n - PARQUET_FOOTER_SUFFIX_BYTES - footer_len;
let metadata = ParquetMetaDataReader::decode_metadata(
&bytes[footer_start..n - PARQUET_FOOTER_SUFFIX_BYTES],
)?;
extract_kv_map(&metadata)
}
pub async fn read_parquet_metadata_lazy(
source: &dyn LazyByteSource,
tail_speculative_bytes: u64,
) -> Result<ParquetMetaData, FooterError> {
let (tail, total) = source
.tail(tail_speculative_bytes)
.await
.map_err(footer_lazy_err)?;
if total < PARQUET_MIN_FILE_BYTES as u64 {
return Err(FooterError::Malformed("not a Parquet file (too short)"));
}
let spec_len = tail.len() as u64;
let spec_start = total - spec_len;
let n = tail.len();
if &tail[n - PARQUET_MAGIC_LEN..n] != b"PAR1" {
return Err(FooterError::Malformed("not a Parquet file (missing PAR1)"));
}
let footer_len_bytes: [u8; PARQUET_FOOTER_LEN_FIELD_BYTES] = tail
[n - PARQUET_FOOTER_SUFFIX_BYTES..n - PARQUET_MAGIC_LEN]
.try_into()
.map_err(|_| FooterError::Malformed("footer length not 4 bytes"))?;
let footer_len = u32::from_le_bytes(footer_len_bytes) as usize;
let footer_end_abs = total - PARQUET_FOOTER_SUFFIX_BYTES as u64;
let footer_start_abs = (total as usize)
.checked_sub(PARQUET_FOOTER_SUFFIX_BYTES + footer_len)
.ok_or(FooterError::Malformed("footer length out of range"))?;
let footer_bytes: Bytes = if (footer_start_abs as u64) >= spec_start {
let off_in_tail = footer_start_abs - (spec_start as usize);
tail.slice(off_in_tail..off_in_tail + footer_len)
} else {
source
.range(footer_start_abs as u64, footer_len as u64)
.await
.map_err(footer_lazy_err)?
};
debug_assert_eq!(footer_start_abs + footer_len, footer_end_abs as usize);
ParquetMetaDataReader::decode_metadata(&footer_bytes).map_err(FooterError::from)
}
pub async fn read_kv_metadata_lazy(
source: &dyn LazyByteSource,
tail_speculative_bytes: u64,
) -> Result<KvMap, FooterError> {
let metadata = read_parquet_metadata_lazy(source, tail_speculative_bytes).await?;
extract_kv_map(&metadata)
}
pub fn extract_kv_map(metadata: &ParquetMetaData) -> Result<KvMap, FooterError> {
let mut out: KvMap = HashMap::new();
if let Some(kvs) = metadata.file_metadata().key_value_metadata() {
for kv in kvs {
if let Some(v) = &kv.value {
out.insert(kv.key.clone(), v.clone());
}
}
}
Ok(out)
}
fn footer_lazy_err(e: LazyByteSourceError) -> FooterError {
FooterError::LazySource(e.to_string())
}
#[cfg(test)]
mod tests {
use arrow_array::{Float64Array, StringArray, UInt64Array};
use arrow_schema::{DataType, Field};
use super::*;
fn small_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt64, false),
Field::new("score", DataType::Float64, false),
Field::new("category", DataType::Utf8, false),
]))
}
fn small_batch(schema: &Arc<Schema>) -> RecordBatch {
let ids = UInt64Array::from(vec![0u64, 1, 2]);
let scores = Float64Array::from(vec![1.1, 2.2, 3.3]);
let cats = StringArray::from(vec!["a", "b", "a"]);
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(ids), Arc::new(scores), Arc::new(cats)],
)
.expect("build RecordBatch")
}
#[allow(clippy::too_many_arguments)]
fn write_with_blobs(
schema: &Arc<Schema>,
batches: &[RecordBatch],
fts_blob: &[u8],
vec_blob: &[u8],
extra_kv: &[(String, String)],
compression: Compression,
row_group_size: usize,
column_page_size_limits: &[(&str, usize)],
) -> Result<ParquetParts, FooterError> {
let body = encode_parquet_body(
schema,
batches,
compression,
row_group_size,
column_page_size_limits,
)?;
splice_index_blobs(body, fts_blob, vec_blob, extra_kv)
}
#[test]
fn write_with_no_blobs_produces_valid_parquet() {
let schema = small_schema();
let batch = small_batch(&schema);
let parts = write_with_blobs(
&schema,
&[batch],
&[],
&[],
&[],
Compression::SNAPPY,
1024,
&[],
)
.expect("write should succeed");
assert_eq!(&parts.bytes[..4], b"PAR1");
assert_eq!(&parts.bytes[parts.bytes.len() - 4..], b"PAR1");
assert_eq!(parts.fts_length, 0);
assert_eq!(parts.vec_length, 0);
}
#[test]
fn write_with_fts_blob_records_offset_and_length() {
let schema = small_schema();
let batch = small_batch(&schema);
let fts_blob = b"FTS_BLOB_BYTES_HERE".to_vec();
let parts = write_with_blobs(
&schema,
&[batch],
&fts_blob,
&[],
&[],
Compression::SNAPPY,
1024,
&[],
)
.expect("write parquet with blobs");
assert_eq!(parts.fts_length as usize, fts_blob.len());
assert!(parts.fts_offset > 4); let read_back = &parts.bytes
[parts.fts_offset as usize..parts.fts_offset as usize + parts.fts_length as usize];
assert_eq!(read_back, fts_blob.as_slice());
}
#[test]
fn write_with_vec_blob_only_records_offset() {
let schema = small_schema();
let batch = small_batch(&schema);
let vec_blob = vec![0xAAu8; 256];
let parts = write_with_blobs(
&schema,
&[batch],
&[],
&vec_blob,
&[],
Compression::SNAPPY,
1024,
&[],
)
.expect("write parquet with blobs");
assert_eq!(parts.fts_length, 0);
assert_eq!(parts.vec_length as usize, vec_blob.len());
assert!(parts.vec_offset > 0);
let read_back = &parts.bytes
[parts.vec_offset as usize..parts.vec_offset as usize + parts.vec_length as usize];
assert_eq!(read_back, vec_blob.as_slice());
}
#[test]
fn write_with_both_blobs_produces_distinct_offsets() {
let schema = small_schema();
let batch = small_batch(&schema);
let fts_blob = vec![0x01u8; 100];
let vec_blob = vec![0x02u8; 200];
let parts = write_with_blobs(
&schema,
&[batch],
&fts_blob,
&vec_blob,
&[],
Compression::SNAPPY,
1024,
&[],
)
.expect("write parquet with blobs");
assert!(parts.vec_offset > parts.fts_offset);
assert_eq!(parts.fts_offset + parts.fts_length, parts.vec_offset);
}
#[test]
fn read_kv_metadata_finds_extra_kv_entries() {
let schema = small_schema();
let batch = small_batch(&schema);
let extra = vec![
("inf.format".to_string(), "infino-superfile".to_string()),
("inf.format_version".to_string(), "1.0.0".to_string()),
("inf.id_column".to_string(), "id".to_string()),
];
let parts = write_with_blobs(
&schema,
&[batch],
&[],
&[],
&extra,
Compression::SNAPPY,
1024,
&[],
)
.expect("write parquet with blobs");
let kv = read_kv_metadata(&parts.bytes).expect("read kv metadata");
assert_eq!(
kv.get("inf.format").map(String::as_str),
Some("infino-superfile")
);
assert_eq!(
kv.get("inf.format_version").map(String::as_str),
Some("1.0.0")
);
assert_eq!(kv.get("inf.id_column").map(String::as_str), Some("id"));
}
#[test]
fn read_kv_metadata_includes_fts_offsets_when_blob_present() {
let schema = small_schema();
let batch = small_batch(&schema);
let fts = vec![0xCCu8; 64];
let parts = write_with_blobs(
&schema,
&[batch],
&fts,
&[],
&[],
Compression::SNAPPY,
1024,
&[],
)
.expect("write parquet with blobs");
let kv = read_kv_metadata(&parts.bytes).expect("read kv metadata");
assert!(kv.contains_key("inf.fts.offset"));
assert!(kv.contains_key("inf.fts.length"));
assert!(!kv.contains_key("inf.vec.offset"));
}
#[test]
fn read_kv_metadata_rejects_non_parquet_input() {
let err = read_kv_metadata(&[0u8; 16]).expect_err("expected error");
assert!(matches!(err, FooterError::Malformed(_)));
}
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use crate::superfile::lazy_source::{BytesLazyByteSource, LazyByteSource, LazyByteSourceError};
#[derive(Debug)]
struct CountingFooterSource {
inner: BytesLazyByteSource,
async_calls: Arc<AtomicU64>,
}
impl CountingFooterSource {
fn new(bytes: Bytes) -> Self {
Self {
inner: BytesLazyByteSource::new(bytes),
async_calls: Arc::new(AtomicU64::new(0)),
}
}
fn counter(&self) -> Arc<AtomicU64> {
Arc::clone(&self.async_calls)
}
}
#[async_trait::async_trait]
impl LazyByteSource for CountingFooterSource {
fn size(&self) -> u64 {
self.inner.size()
}
async fn range(&self, start: u64, len: u64) -> Result<Bytes, LazyByteSourceError> {
self.async_calls.fetch_add(1, Ordering::Relaxed);
self.inner.range(start, len).await
}
fn try_get_range_sync(&self, _start: u64, _len: u64) -> Option<Bytes> {
None
}
}
#[tokio::test]
async fn lazy_kv_metadata_one_range_when_tail_covers_footer() {
let schema = small_schema();
let batch = small_batch(&schema);
let extra = vec![
("inf.format".to_string(), "infino-superfile".to_string()),
("inf.format_version".to_string(), "1.0.0".to_string()),
("inf.id_column".to_string(), "id".to_string()),
];
let parts = write_with_blobs(
&schema,
&[batch],
&[],
&[],
&extra,
Compression::SNAPPY,
1024,
&[],
)
.expect("write parquet with blobs");
let eager = read_kv_metadata(&parts.bytes).expect("eager kv");
let source = CountingFooterSource::new(Bytes::from(parts.bytes));
let counter = source.counter();
let lazy = read_kv_metadata_lazy(&source, 64 * 1024)
.await
.expect("lazy kv");
assert_eq!(
counter.load(Ordering::Relaxed),
1,
"tail wholly contains footer ⇒ exactly 1 range GET",
);
assert_eq!(eager, lazy, "lazy + eager KV maps must agree");
}
#[tokio::test]
async fn lazy_kv_metadata_two_ranges_when_followup_needed() {
let schema = small_schema();
let batch = small_batch(&schema);
let extra = vec![
("inf.format".to_string(), "infino-superfile".to_string()),
("inf.format_version".to_string(), "1.0.0".to_string()),
("inf.id_column".to_string(), "id".to_string()),
];
let parts = write_with_blobs(
&schema,
&[batch],
&[],
&[],
&extra,
Compression::SNAPPY,
1024,
&[],
)
.expect("write parquet with blobs");
let eager = read_kv_metadata(&parts.bytes).expect("eager kv");
let source = CountingFooterSource::new(Bytes::from(parts.bytes));
let counter = source.counter();
let lazy = read_kv_metadata_lazy(&source, 16)
.await
.expect("lazy kv (followup)");
assert_eq!(
counter.load(Ordering::Relaxed),
2,
"tail < footer ⇒ 1 tail GET + 1 follow-up GET",
);
assert_eq!(eager, lazy, "lazy + eager KV maps must agree");
}
}