use apache_avro::{from_value, to_value, Codec, Reader, Schema, Writer};
use serde::de::DeserializeOwned;
use serde::Serialize;
pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8]) -> crate::Result<Vec<T>> {
Reader::new(bytes)?
.map(|r| {
let value = r?;
from_value::<T>(&value).map_err(crate::Error::from)
})
.collect()
}
pub fn to_avro_bytes<T: Serialize>(schema_json: &str, records: &[T]) -> crate::Result<Vec<u8>> {
let schema = Schema::parse_str(schema_json)?;
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
for record in records {
let value = to_value(record).and_then(|v| v.resolve(&schema))?;
writer.append(value)?;
}
Ok(writer.into_inner()?)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::spec::manifest_common::FileKind;
use crate::spec::manifest_entry::{ManifestEntry, MANIFEST_ENTRY_SCHEMA};
use crate::spec::manifest_file_meta::MANIFEST_FILE_META_SCHEMA;
use crate::spec::stats::BinaryTableStats;
use crate::spec::{DataFileMeta, ManifestFileMeta};
use chrono::{DateTime, Utc};
#[test]
fn test_roundtrip_manifest_file_meta() {
let value_bytes = vec![
0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129,
];
let original = vec![ManifestFileMeta::new(
"manifest-test-0".to_string(),
1024,
5,
2,
BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![Some(1)]),
0,
)];
let bytes = to_avro_bytes(MANIFEST_FILE_META_SCHEMA, &original).unwrap();
let decoded = from_avro_bytes::<ManifestFileMeta>(&bytes).unwrap();
assert_eq!(original, decoded);
}
#[test]
fn test_roundtrip_manifest_entry() {
let value_bytes = vec![
0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, 1, 0, 0, 0, 0, 0, 0, 0,
];
let single_value = vec![0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0];
let original = vec![ManifestEntry::new(
FileKind::Add,
single_value.clone(),
1,
10,
DataFileMeta {
file_name: "test.parquet".to_string(),
file_size: 100,
row_count: 50,
min_key: single_value.clone(),
max_key: single_value.clone(),
key_stats: BinaryTableStats::new(
value_bytes.clone(),
value_bytes.clone(),
vec![Some(1), Some(2)],
),
value_stats: BinaryTableStats::new(
value_bytes.clone(),
value_bytes.clone(),
vec![Some(1), Some(2)],
),
min_sequence_number: 1,
max_sequence_number: 50,
schema_id: 0,
level: 0,
extra_files: vec![],
creation_time: Some(
"2024-09-06T07:45:55.039+00:00"
.parse::<DateTime<Utc>>()
.unwrap(),
),
delete_row_count: Some(0),
embedded_index: None,
first_row_id: None,
write_cols: None,
external_path: None,
file_source: None,
value_stats_cols: None,
},
2,
)];
let bytes = to_avro_bytes(MANIFEST_ENTRY_SCHEMA, &original).unwrap();
let decoded = from_avro_bytes::<ManifestEntry>(&bytes).unwrap();
assert_eq!(original, decoded);
}
#[tokio::test]
async fn test_read_manifest_list() {
let workdir =
std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}"));
let path = workdir
.join("tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0");
let v = std::fs::read(path.to_str().unwrap()).unwrap();
let res = from_avro_bytes::<ManifestFileMeta>(&v).unwrap();
let value_bytes = vec![
0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129,
];
assert_eq!(
res,
vec![
ManifestFileMeta::new(
"manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(),
10,
10,
10,
BinaryTableStats::new(
value_bytes.clone(),
value_bytes.clone(),
vec![Some(1), Some(2)]
),
1
),
ManifestFileMeta::new(
"manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(),
11,
0,
10,
BinaryTableStats::new(
value_bytes.clone(),
value_bytes.clone(),
vec![Some(1), Some(2)]
),
2
)
],
);
}
#[tokio::test]
async fn test_read_manifest_entry() {
let workdir =
std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}"));
let path =
workdir.join("tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0");
let v = std::fs::read(path.to_str().unwrap()).unwrap();
let res = from_avro_bytes::<ManifestEntry>(&v).unwrap();
let value_bytes = vec![
0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, 1, 0, 0, 0, 0, 0, 0, 0,
];
let single_value = vec![0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0];
assert_eq!(
res,
vec![
ManifestEntry::new(
FileKind::Delete,
single_value.clone(),
1,
10,
DataFileMeta {
file_name: "f1.parquet".to_string(),
file_size: 10,
row_count: 100,
min_key: single_value.clone(),
max_key: single_value.clone(),
key_stats: BinaryTableStats::new(
value_bytes.clone(),
value_bytes.clone(),
vec![Some(1), Some(2)]
),
value_stats: BinaryTableStats::new(
value_bytes.clone(),
value_bytes.clone(),
vec![Some(1), Some(2)]
),
min_sequence_number: 1,
max_sequence_number: 100,
schema_id: 0,
level: 1,
extra_files: vec![],
creation_time: Some(
"2024-09-06T07:45:55.039+00:00"
.parse::<DateTime<Utc>>()
.unwrap()
),
delete_row_count: Some(0),
embedded_index: None,
first_row_id: None,
write_cols: None,
external_path: None,
file_source: None,
value_stats_cols: None,
},
2
),
ManifestEntry::new(
FileKind::Add,
single_value.clone(),
2,
10,
DataFileMeta {
file_name: "f2.parquet".to_string(),
file_size: 10,
row_count: 100,
min_key: single_value.clone(),
max_key: single_value.clone(),
key_stats: BinaryTableStats::new(
value_bytes.clone(),
value_bytes.clone(),
vec![Some(1), Some(2)]
),
value_stats: BinaryTableStats::new(
value_bytes.clone(),
value_bytes.clone(),
vec![Some(1), Some(2)]
),
min_sequence_number: 1,
max_sequence_number: 100,
schema_id: 0,
level: 1,
extra_files: vec![],
creation_time: Some(
"2024-09-06T07:45:55.039+00:00"
.parse::<DateTime<Utc>>()
.unwrap()
),
delete_row_count: Some(1),
embedded_index: None,
first_row_id: None,
write_cols: None,
external_path: None,
file_source: None,
value_stats_cols: None,
},
2
),
]
)
}
}