use crate::error::Result;
use crate::spec::DataFile;
use apache_avro::types::Value;
pub fn data_file_to_avro(data_file: &DataFile) -> Result<Value> {
let partition_record = Value::Record(vec![]);
let mut fields = vec![
("content".to_string(), Value::Int(0)), (
"file_path".to_string(),
Value::String(data_file.file_path().to_string()),
),
(
"file_format".to_string(),
Value::String(data_file.file_format().to_string()),
),
("partition".to_string(), partition_record),
(
"record_count".to_string(),
Value::Long(data_file.record_count()),
),
(
"file_size_in_bytes".to_string(),
Value::Long(data_file.file_size_in_bytes()),
),
];
let column_sizes = if let Some(sizes) = data_file.column_sizes() {
let array: Vec<Value> = sizes
.iter()
.map(|(k, v)| {
Value::Record(vec![
("key".to_string(), Value::Int(*k)),
("value".to_string(), Value::Long(*v)),
])
})
.collect();
Value::Union(1, Box::new(Value::Array(array)))
} else {
Value::Union(0, Box::new(Value::Null))
};
fields.push(("column_sizes".to_string(), column_sizes));
let value_counts = if let Some(counts) = data_file.value_counts() {
let array: Vec<Value> = counts
.iter()
.map(|(k, v)| {
Value::Record(vec![
("key".to_string(), Value::Int(*k)),
("value".to_string(), Value::Long(*v)),
])
})
.collect();
Value::Union(1, Box::new(Value::Array(array)))
} else {
Value::Union(0, Box::new(Value::Null))
};
fields.push(("value_counts".to_string(), value_counts));
let null_value_counts = if let Some(counts) = data_file.null_value_counts() {
let array: Vec<Value> = counts
.iter()
.map(|(k, v)| {
Value::Record(vec![
("key".to_string(), Value::Int(*k)),
("value".to_string(), Value::Long(*v)),
])
})
.collect();
Value::Union(1, Box::new(Value::Array(array)))
} else {
Value::Union(0, Box::new(Value::Null))
};
fields.push(("null_value_counts".to_string(), null_value_counts));
let split_offsets = if let Some(offsets) = data_file.split_offsets() {
let array: Vec<Value> = offsets.iter().map(|offset| Value::Long(*offset)).collect();
Value::Union(1, Box::new(Value::Array(array)))
} else {
Value::Union(0, Box::new(Value::Null))
};
fields.push(("split_offsets".to_string(), split_offsets));
let key_metadata = if let Some(bytes) = data_file.key_metadata() {
Value::Union(1, Box::new(Value::Bytes(bytes.to_vec())))
} else {
Value::Union(0, Box::new(Value::Null))
};
fields.push(("key_metadata".to_string(), key_metadata));
let lower_bounds = if let Some(bounds) = data_file.lower_bounds() {
let array: Vec<Value> = bounds
.iter()
.map(|(k, v)| {
Value::Record(vec![
("key".to_string(), Value::Int(*k)),
("value".to_string(), Value::Bytes(v.clone())),
])
})
.collect();
Value::Union(1, Box::new(Value::Array(array)))
} else {
Value::Union(0, Box::new(Value::Null))
};
fields.push(("lower_bounds".to_string(), lower_bounds));
let upper_bounds = if let Some(bounds) = data_file.upper_bounds() {
let array: Vec<Value> = bounds
.iter()
.map(|(k, v)| {
Value::Record(vec![
("key".to_string(), Value::Int(*k)),
("value".to_string(), Value::Bytes(v.clone())),
])
})
.collect();
Value::Union(1, Box::new(Value::Array(array)))
} else {
Value::Union(0, Box::new(Value::Null))
};
fields.push(("upper_bounds".to_string(), upper_bounds));
fields.push((
"equality_ids".to_string(),
if let Some(ids) = data_file.equality_ids() {
let values = ids.iter().map(|id| Value::Int(*id)).collect();
Value::Union(1, Box::new(Value::Array(values)))
} else {
Value::Union(0, Box::new(Value::Null))
},
));
fields.push((
"sort_order_id".to_string(),
Value::Union(0, Box::new(Value::Null)),
));
Ok(Value::Record(fields))
}