use std::collections::HashMap;
use std::io::{Read, Write};
use std::str::FromStr;
use apache_avro::{Reader as AvroReader, Writer as AvroWriter, from_value, to_value};
use serde_derive::{Deserialize, Serialize};
use serde_with::{DeserializeFromStr, SerializeDisplay};
use super::_serde::DataFileSerde;
use super::{
Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2, data_file_schema_v3,
};
use crate::error::Result;
use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
use crate::{Error, ErrorKind};
#[derive(Debug, PartialEq, Clone, Eq, Builder)]
pub struct DataFile {
pub(crate) content: DataContentType,
pub(crate) file_path: String,
pub(crate) file_format: DataFileFormat,
#[builder(default = "Struct::empty()")]
pub(crate) partition: Struct,
pub(crate) record_count: u64,
pub(crate) file_size_in_bytes: u64,
#[builder(default)]
pub(crate) column_sizes: HashMap<i32, u64>,
#[builder(default)]
pub(crate) value_counts: HashMap<i32, u64>,
#[builder(default)]
pub(crate) null_value_counts: HashMap<i32, u64>,
#[builder(default)]
pub(crate) nan_value_counts: HashMap<i32, u64>,
#[builder(default)]
pub(crate) lower_bounds: HashMap<i32, Datum>,
#[builder(default)]
pub(crate) upper_bounds: HashMap<i32, Datum>,
#[builder(default)]
pub(crate) key_metadata: Option<Vec<u8>>,
#[builder(default)]
pub(crate) split_offsets: Option<Vec<i64>>,
#[builder(default)]
pub(crate) equality_ids: Option<Vec<i32>>,
#[builder(default, setter(strip_option))]
pub(crate) sort_order_id: Option<i32>,
#[builder(default)]
pub(crate) first_row_id: Option<i64>,
#[builder(default = "DEFAULT_PARTITION_SPEC_ID")]
pub(crate) partition_spec_id: i32,
#[builder(default)]
pub(crate) referenced_data_file: Option<String>,
#[builder(default)]
pub(crate) content_offset: Option<i64>,
#[builder(default)]
pub(crate) content_size_in_bytes: Option<i64>,
}
impl DataFile {
pub fn content_type(&self) -> DataContentType {
self.content
}
pub fn file_path(&self) -> &str {
&self.file_path
}
pub fn file_format(&self) -> DataFileFormat {
self.file_format
}
pub fn partition(&self) -> &Struct {
&self.partition
}
pub fn record_count(&self) -> u64 {
self.record_count
}
pub fn file_size_in_bytes(&self) -> u64 {
self.file_size_in_bytes
}
pub fn column_sizes(&self) -> &HashMap<i32, u64> {
&self.column_sizes
}
pub fn value_counts(&self) -> &HashMap<i32, u64> {
&self.value_counts
}
pub fn null_value_counts(&self) -> &HashMap<i32, u64> {
&self.null_value_counts
}
pub fn nan_value_counts(&self) -> &HashMap<i32, u64> {
&self.nan_value_counts
}
pub fn lower_bounds(&self) -> &HashMap<i32, Datum> {
&self.lower_bounds
}
pub fn upper_bounds(&self) -> &HashMap<i32, Datum> {
&self.upper_bounds
}
pub fn key_metadata(&self) -> Option<&[u8]> {
self.key_metadata.as_deref()
}
pub fn split_offsets(&self) -> Option<&[i64]> {
self.split_offsets.as_deref()
}
pub fn equality_ids(&self) -> Option<Vec<i32>> {
self.equality_ids.clone()
}
pub fn first_row_id(&self) -> Option<i64> {
self.first_row_id
}
pub fn sort_order_id(&self) -> Option<i32> {
self.sort_order_id
}
pub fn referenced_data_file(&self) -> Option<String> {
self.referenced_data_file.clone()
}
pub fn content_offset(&self) -> Option<i64> {
self.content_offset
}
pub fn content_size_in_bytes(&self) -> Option<i64> {
self.content_size_in_bytes
}
}
pub fn write_data_files_to_avro<W: Write>(
writer: &mut W,
data_files: impl IntoIterator<Item = DataFile>,
partition_type: &StructType,
version: FormatVersion,
) -> Result<usize> {
let avro_schema = match version {
FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
};
let mut writer = AvroWriter::new(&avro_schema, writer);
for data_file in data_files {
let value = to_value(DataFileSerde::try_from(
data_file,
partition_type,
FormatVersion::V1,
)?)?
.resolve(&avro_schema)?;
writer.append(value)?;
}
Ok(writer.flush()?)
}
pub fn read_data_files_from_avro<R: Read>(
reader: &mut R,
schema: &Schema,
partition_spec_id: i32,
partition_type: &StructType,
version: FormatVersion,
) -> Result<Vec<DataFile>> {
let avro_schema = match version {
FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
};
let reader = AvroReader::with_schema(&avro_schema, reader)?;
reader
.into_iter()
.map(|value| {
from_value::<DataFileSerde>(&value?)?.try_into(
partition_spec_id,
partition_type,
schema,
)
})
.collect::<Result<Vec<_>>>()
}
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Default)]
pub enum DataContentType {
#[default]
Data = 0,
PositionDeletes = 1,
EqualityDeletes = 2,
}
impl TryFrom<i32> for DataContentType {
type Error = Error;
fn try_from(v: i32) -> Result<DataContentType> {
match v {
0 => Ok(DataContentType::Data),
1 => Ok(DataContentType::PositionDeletes),
2 => Ok(DataContentType::EqualityDeletes),
_ => Err(Error::new(
ErrorKind::DataInvalid,
format!("data content type {v} is invalid"),
)),
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)]
pub enum DataFileFormat {
Avro,
Orc,
Parquet,
Puffin,
}
impl FromStr for DataFileFormat {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
match s.to_lowercase().as_str() {
"avro" => Ok(Self::Avro),
"orc" => Ok(Self::Orc),
"parquet" => Ok(Self::Parquet),
"puffin" => Ok(Self::Puffin),
_ => Err(Error::new(
ErrorKind::DataInvalid,
format!("Unsupported data file format: {s}"),
)),
}
}
}
impl std::fmt::Display for DataFileFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DataFileFormat::Avro => write!(f, "avro"),
DataFileFormat::Orc => write!(f, "orc"),
DataFileFormat::Parquet => write!(f, "parquet"),
DataFileFormat::Puffin => write!(f, "puffin"),
}
}
}
#[cfg(test)]
mod test {
use crate::spec::DataContentType;
#[test]
fn test_data_content_type_default() {
assert_eq!(DataContentType::default(), DataContentType::Data);
}
#[test]
fn test_data_content_type_default_value() {
assert_eq!(DataContentType::default() as i32, 0);
}
}