use crate::types::{level::ParquetFileMeta, schema::TableSchema, MeruError, Result};
use std::collections::HashMap;
pub fn encode_footer_kv(
meta: &ParquetFileMeta,
schema: &TableSchema,
) -> Result<Vec<(String, String)>> {
let meta_json = meta.serialize()?;
let schema_json =
serde_json::to_string(schema).map_err(|e| MeruError::Parquet(e.to_string()))?;
Ok(vec![
(ParquetFileMeta::FOOTER_KEY.to_string(), meta_json),
(ParquetFileMeta::SCHEMA_KEY.to_string(), schema_json),
])
}
pub fn decode_footer_kv(kv: &HashMap<String, String>) -> Result<(ParquetFileMeta, TableSchema)> {
let meta_json = kv.get(ParquetFileMeta::FOOTER_KEY).ok_or_else(|| {
MeruError::Corruption(format!(
"missing '{}' in Parquet KV footer",
ParquetFileMeta::FOOTER_KEY
))
})?;
let schema_json = kv.get(ParquetFileMeta::SCHEMA_KEY).ok_or_else(|| {
MeruError::Corruption(format!(
"missing '{}' in Parquet KV footer",
ParquetFileMeta::SCHEMA_KEY
))
})?;
let meta: ParquetFileMeta = ParquetFileMeta::deserialize(meta_json).map_err(|e| {
MeruError::Corruption(format!(
"failed to parse '{}': {e}",
ParquetFileMeta::FOOTER_KEY
))
})?;
let schema: TableSchema = serde_json::from_str(schema_json).map_err(|e| {
MeruError::Corruption(format!(
"failed to parse '{}': {e}",
ParquetFileMeta::SCHEMA_KEY
))
})?;
Ok((meta, schema))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{
level::{Level, ParquetFileMeta},
schema::{ColumnDef, ColumnType, TableSchema},
};
fn sample_meta() -> ParquetFileMeta {
ParquetFileMeta {
level: Level(2),
seq_min: 1,
seq_max: 100,
key_min: vec![0x00, 0x01, 0x7F, 0x80, 0xFE, 0xFF],
key_max: vec![0xFF; 16],
num_rows: 500,
file_size: 1024 * 1024,
dv_path: Some("s3://bucket/path.puffin".into()),
dv_offset: Some(42),
dv_length: Some(128),
format: None,
column_stats: None,
}
}
fn sample_schema() -> TableSchema {
TableSchema {
table_name: "events".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "val".into(),
col_type: ColumnType::ByteArray,
nullable: true,
..Default::default()
},
ColumnDef {
name: "fb".into(),
col_type: ColumnType::FixedLenByteArray(8),
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
#[test]
fn roundtrip_preserves_every_field() {
let meta = sample_meta();
let schema = sample_schema();
let kv = encode_footer_kv(&meta, &schema).unwrap();
let map: HashMap<_, _> = kv.into_iter().collect();
let (got_meta, got_schema) = decode_footer_kv(&map).unwrap();
assert_eq!(got_meta.level, meta.level);
assert_eq!(got_meta.seq_min, meta.seq_min);
assert_eq!(got_meta.seq_max, meta.seq_max);
assert_eq!(
got_meta.key_min, meta.key_min,
"key_min must round-trip byte-for-byte"
);
assert_eq!(
got_meta.key_max, meta.key_max,
"key_max must round-trip byte-for-byte"
);
assert_eq!(got_meta.num_rows, meta.num_rows);
assert_eq!(got_meta.file_size, meta.file_size);
assert_eq!(got_meta.dv_path, meta.dv_path);
assert_eq!(got_meta.dv_offset, meta.dv_offset);
assert_eq!(got_meta.dv_length, meta.dv_length);
assert_eq!(got_schema.table_name, schema.table_name);
assert_eq!(got_schema.primary_key, schema.primary_key);
assert_eq!(got_schema.columns.len(), schema.columns.len());
for (i, (orig, got)) in schema.columns.iter().zip(&got_schema.columns).enumerate() {
assert_eq!(orig.name, got.name, "column {i} name");
assert_eq!(orig.col_type, got.col_type, "column {i} col_type");
assert_eq!(orig.nullable, got.nullable, "column {i} nullable");
}
}
#[test]
fn encode_returns_two_keys_in_stable_order() {
let meta = sample_meta();
let schema = sample_schema();
let kv = encode_footer_kv(&meta, &schema).unwrap();
assert_eq!(kv.len(), 2);
assert_eq!(kv[0].0, ParquetFileMeta::FOOTER_KEY);
assert_eq!(kv[1].0, ParquetFileMeta::SCHEMA_KEY);
}
#[test]
fn missing_meta_key_errors_with_key_name() {
let schema = sample_schema();
let mut map = HashMap::new();
map.insert(
ParquetFileMeta::SCHEMA_KEY.to_string(),
serde_json::to_string(&schema).unwrap(),
);
let err = decode_footer_kv(&map).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains(ParquetFileMeta::FOOTER_KEY) && msg.contains("missing"),
"error should name the missing meta key: {msg}"
);
}
#[test]
fn missing_schema_key_errors_with_key_name() {
let meta = sample_meta();
let mut map = HashMap::new();
map.insert(
ParquetFileMeta::FOOTER_KEY.to_string(),
meta.serialize().unwrap(),
);
let err = decode_footer_kv(&map).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains(ParquetFileMeta::SCHEMA_KEY) && msg.contains("missing"),
"error should name the missing schema key: {msg}"
);
}
#[test]
fn empty_kv_errors() {
let kv: HashMap<String, String> = HashMap::new();
assert!(decode_footer_kv(&kv).is_err());
}
#[test]
fn corrupt_meta_json_errors_with_key_name() {
let schema = sample_schema();
let mut map = HashMap::new();
map.insert(
ParquetFileMeta::FOOTER_KEY.to_string(),
"not-json-at-all {[".into(),
);
map.insert(
ParquetFileMeta::SCHEMA_KEY.to_string(),
serde_json::to_string(&schema).unwrap(),
);
let err = decode_footer_kv(&map).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains(ParquetFileMeta::FOOTER_KEY) && msg.contains("parse"),
"error should name the failing key: {msg}"
);
assert!(matches!(err, MeruError::Corruption(_)));
}
#[test]
fn corrupt_schema_json_errors_with_key_name() {
let meta = sample_meta();
let mut map = HashMap::new();
map.insert(
ParquetFileMeta::FOOTER_KEY.to_string(),
meta.serialize().unwrap(),
);
map.insert(
ParquetFileMeta::SCHEMA_KEY.to_string(),
"{\"table_name\": not-a-string}".into(),
);
let err = decode_footer_kv(&map).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains(ParquetFileMeta::SCHEMA_KEY) && msg.contains("parse"),
"error should name the failing key: {msg}"
);
assert!(matches!(err, MeruError::Corruption(_)));
}
#[test]
fn unknown_extra_keys_are_ignored() {
let meta = sample_meta();
let schema = sample_schema();
let mut map: HashMap<String, String> = encode_footer_kv(&meta, &schema)
.unwrap()
.into_iter()
.collect();
map.insert(
"merutable.future_feature".into(),
"some opaque value".into(),
);
map.insert("third_party.random".into(), "{\"nested\":true}".into());
let (got_meta, got_schema) = decode_footer_kv(&map).unwrap();
assert_eq!(got_meta.num_rows, meta.num_rows);
assert_eq!(got_schema.table_name, schema.table_name);
}
#[test]
fn key_bytes_roundtrip_arbitrary_non_utf8() {
let mut meta = sample_meta();
meta.key_min = (0u8..=255).collect();
meta.key_max = (0u8..=255).rev().collect();
let schema = sample_schema();
let map: HashMap<_, _> = encode_footer_kv(&meta, &schema)
.unwrap()
.into_iter()
.collect();
let (got, _) = decode_footer_kv(&map).unwrap();
assert_eq!(got.key_min, meta.key_min);
assert_eq!(got.key_max, meta.key_max);
}
}