use std::io::Write;
use std::str::FromStr;
use std::{fmt, str};
pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
use crate::file::metadata::HeapSize;
use crate::parquet_thrift::{
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
WriteThrift, WriteThriftField,
};
use crate::{thrift_enum, thrift_struct, thrift_union_all_empty, write_thrift_field};
use crate::errors::{ParquetError, Result};
thrift_enum!(
enum Type {
BOOLEAN = 0;
INT32 = 1;
INT64 = 2;
INT96 = 3; FLOAT = 4;
DOUBLE = 5;
BYTE_ARRAY = 6;
FIXED_LEN_BYTE_ARRAY = 7;
}
);
thrift_enum!(
enum ConvertedType {
NONE = -1;
UTF8 = 0;
MAP = 1;
MAP_KEY_VALUE = 2;
LIST = 3;
ENUM = 4;
DECIMAL = 5;
DATE = 6;
TIME_MILLIS = 7;
TIME_MICROS = 8;
TIMESTAMP_MILLIS = 9;
TIMESTAMP_MICROS = 10;
UINT_8 = 11;
UINT_16 = 12;
UINT_32 = 13;
UINT_64 = 14;
INT_8 = 15;
INT_16 = 16;
INT_32 = 17;
INT_64 = 18;
JSON = 19;
BSON = 20;
INTERVAL = 21;
}
);
thrift_union_all_empty!(
union TimeUnit {
1: MilliSeconds MILLIS
2: MicroSeconds MICROS
3: NanoSeconds NANOS
}
);
thrift_struct!(
struct DecimalType {
1: required i32 scale
2: required i32 precision
}
);
thrift_struct!(
struct TimestampType {
1: required bool is_adjusted_to_u_t_c
2: required TimeUnit unit
}
);
use TimestampType as TimeType;
thrift_struct!(
struct IntType {
1: required i8 bit_width
2: required bool is_signed
}
);
thrift_struct!(
struct VariantType {
1: optional i8 specification_version
}
);
thrift_struct!(
struct GeometryType<'a> {
1: optional string<'a> crs;
}
);
thrift_struct!(
struct GeographyType<'a> {
1: optional string<'a> crs;
2: optional EdgeInterpolationAlgorithm algorithm;
}
);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LogicalType {
String,
Map,
List,
Enum,
Decimal {
scale: i32,
precision: i32,
},
Date,
Time {
is_adjusted_to_u_t_c: bool,
unit: TimeUnit,
},
Timestamp {
is_adjusted_to_u_t_c: bool,
unit: TimeUnit,
},
Integer {
bit_width: i8,
is_signed: bool,
},
Unknown,
Json,
Bson,
Uuid,
Float16,
Variant {
specification_version: Option<i8>,
},
Geometry {
crs: Option<String>,
},
Geography {
crs: Option<String>,
algorithm: Option<EdgeInterpolationAlgorithm>,
},
_Unknown {
field_id: i16,
},
}
impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for LogicalType {
fn read_thrift(prot: &mut R) -> Result<Self> {
let field_ident = prot.read_field_begin(0)?;
if field_ident.field_type == FieldType::Stop {
return Err(general_err!("received empty union from remote LogicalType"));
}
let ret = match field_ident.id {
1 => {
prot.skip_empty_struct()?;
Self::String
}
2 => {
prot.skip_empty_struct()?;
Self::Map
}
3 => {
prot.skip_empty_struct()?;
Self::List
}
4 => {
prot.skip_empty_struct()?;
Self::Enum
}
5 => {
let val = DecimalType::read_thrift(&mut *prot)?;
Self::Decimal {
scale: val.scale,
precision: val.precision,
}
}
6 => {
prot.skip_empty_struct()?;
Self::Date
}
7 => {
let val = TimeType::read_thrift(&mut *prot)?;
Self::Time {
is_adjusted_to_u_t_c: val.is_adjusted_to_u_t_c,
unit: val.unit,
}
}
8 => {
let val = TimestampType::read_thrift(&mut *prot)?;
Self::Timestamp {
is_adjusted_to_u_t_c: val.is_adjusted_to_u_t_c,
unit: val.unit,
}
}
10 => {
let val = IntType::read_thrift(&mut *prot)?;
Self::Integer {
is_signed: val.is_signed,
bit_width: val.bit_width,
}
}
11 => {
prot.skip_empty_struct()?;
Self::Unknown
}
12 => {
prot.skip_empty_struct()?;
Self::Json
}
13 => {
prot.skip_empty_struct()?;
Self::Bson
}
14 => {
prot.skip_empty_struct()?;
Self::Uuid
}
15 => {
prot.skip_empty_struct()?;
Self::Float16
}
16 => {
let val = VariantType::read_thrift(&mut *prot)?;
Self::Variant {
specification_version: val.specification_version,
}
}
17 => {
let val = GeometryType::read_thrift(&mut *prot)?;
Self::Geometry {
crs: val.crs.map(|s| s.to_owned()),
}
}
18 => {
let val = GeographyType::read_thrift(&mut *prot)?;
let algorithm = val
.algorithm
.unwrap_or(EdgeInterpolationAlgorithm::SPHERICAL);
Self::Geography {
crs: val.crs.map(|s| s.to_owned()),
algorithm: Some(algorithm),
}
}
_ => {
prot.skip(field_ident.field_type)?;
Self::_Unknown {
field_id: field_ident.id,
}
}
};
let field_ident = prot.read_field_begin(field_ident.id)?;
if field_ident.field_type != FieldType::Stop {
return Err(general_err!(
"Received multiple fields for union from remote LogicalType"
));
}
Ok(ret)
}
}
impl WriteThrift for LogicalType {
const ELEMENT_TYPE: ElementType = ElementType::Struct;
fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
match self {
Self::String => {
writer.write_empty_struct(1, 0)?;
}
Self::Map => {
writer.write_empty_struct(2, 0)?;
}
Self::List => {
writer.write_empty_struct(3, 0)?;
}
Self::Enum => {
writer.write_empty_struct(4, 0)?;
}
Self::Decimal { scale, precision } => {
DecimalType {
scale: *scale,
precision: *precision,
}
.write_thrift_field(writer, 5, 0)?;
}
Self::Date => {
writer.write_empty_struct(6, 0)?;
}
Self::Time {
is_adjusted_to_u_t_c,
unit,
} => {
TimeType {
is_adjusted_to_u_t_c: *is_adjusted_to_u_t_c,
unit: *unit,
}
.write_thrift_field(writer, 7, 0)?;
}
Self::Timestamp {
is_adjusted_to_u_t_c,
unit,
} => {
TimestampType {
is_adjusted_to_u_t_c: *is_adjusted_to_u_t_c,
unit: *unit,
}
.write_thrift_field(writer, 8, 0)?;
}
Self::Integer {
bit_width,
is_signed,
} => {
IntType {
bit_width: *bit_width,
is_signed: *is_signed,
}
.write_thrift_field(writer, 10, 0)?;
}
Self::Unknown => {
writer.write_empty_struct(11, 0)?;
}
Self::Json => {
writer.write_empty_struct(12, 0)?;
}
Self::Bson => {
writer.write_empty_struct(13, 0)?;
}
Self::Uuid => {
writer.write_empty_struct(14, 0)?;
}
Self::Float16 => {
writer.write_empty_struct(15, 0)?;
}
Self::Variant {
specification_version,
} => {
VariantType {
specification_version: *specification_version,
}
.write_thrift_field(writer, 16, 0)?;
}
Self::Geometry { crs } => {
GeometryType {
crs: crs.as_ref().map(|s| s.as_str()),
}
.write_thrift_field(writer, 17, 0)?;
}
Self::Geography { crs, algorithm } => {
GeographyType {
crs: crs.as_ref().map(|s| s.as_str()),
algorithm: *algorithm,
}
.write_thrift_field(writer, 18, 0)?;
}
_ => return Err(nyi_err!("logical type")),
}
writer.write_struct_end()
}
}
write_thrift_field!(LogicalType, FieldType::Struct);
thrift_enum!(
enum FieldRepetitionType {
REQUIRED = 0;
OPTIONAL = 1;
REPEATED = 2;
}
);
pub type Repetition = FieldRepetitionType;
thrift_enum!(
enum Encoding {
PLAIN = 0;
PLAIN_DICTIONARY = 2;
RLE = 3;
#[deprecated(
since = "51.0.0",
note = "Please see documentation for compatibility issues and use the RLE/bit-packing hybrid encoding instead"
)]
BIT_PACKED = 4;
DELTA_BINARY_PACKED = 5;
DELTA_LENGTH_BYTE_ARRAY = 6;
DELTA_BYTE_ARRAY = 7;
RLE_DICTIONARY = 8;
BYTE_STREAM_SPLIT = 9;
}
);
impl FromStr for Encoding {
type Err = ParquetError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"PLAIN" | "plain" => Ok(Encoding::PLAIN),
"PLAIN_DICTIONARY" | "plain_dictionary" => Ok(Encoding::PLAIN_DICTIONARY),
"RLE" | "rle" => Ok(Encoding::RLE),
#[allow(deprecated)]
"BIT_PACKED" | "bit_packed" => Ok(Encoding::BIT_PACKED),
"DELTA_BINARY_PACKED" | "delta_binary_packed" => Ok(Encoding::DELTA_BINARY_PACKED),
"DELTA_LENGTH_BYTE_ARRAY" | "delta_length_byte_array" => {
Ok(Encoding::DELTA_LENGTH_BYTE_ARRAY)
}
"DELTA_BYTE_ARRAY" | "delta_byte_array" => Ok(Encoding::DELTA_BYTE_ARRAY),
"RLE_DICTIONARY" | "rle_dictionary" => Ok(Encoding::RLE_DICTIONARY),
"BYTE_STREAM_SPLIT" | "byte_stream_split" => Ok(Encoding::BYTE_STREAM_SPLIT),
_ => Err(general_err!("unknown encoding: {}", s)),
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct EncodingMask(i32);
impl EncodingMask {
const MAX_ENCODING: i32 = Encoding::MAX_DISCRIMINANT;
const ALLOWED_MASK: u32 =
!(1u32 << (EncodingMask::MAX_ENCODING as u32 + 1)).wrapping_sub(1) | 1 << 1;
pub fn try_new(val: i32) -> Result<Self> {
if val as u32 & Self::ALLOWED_MASK != 0 {
return Err(general_err!("Attempt to create invalid mask: 0x{:x}", val));
}
Ok(Self(val))
}
pub fn as_i32(&self) -> i32 {
self.0
}
pub fn new_from_encodings<'a>(encodings: impl Iterator<Item = &'a Encoding>) -> Self {
let mut mask = 0;
for &e in encodings {
mask |= 1 << (e as i32);
}
Self(mask)
}
pub fn insert(&mut self, val: Encoding) {
self.0 |= 1 << (val as i32);
}
pub fn is_set(&self, val: Encoding) -> bool {
self.0 & (1 << (val as i32)) != 0
}
pub fn is_only(&self, val: Encoding) -> bool {
self.0 == (1 << (val as i32))
}
pub fn all_set<'a>(&self, mut encodings: impl Iterator<Item = &'a Encoding>) -> bool {
encodings.all(|&e| self.is_set(e))
}
pub fn encodings(&self) -> impl Iterator<Item = Encoding> {
Self::mask_to_encodings_iter(self.0)
}
fn mask_to_encodings_iter(mask: i32) -> impl Iterator<Item = Encoding> {
(0..=Self::MAX_ENCODING)
.filter(move |i| mask & (1 << i) != 0)
.map(i32_to_encoding)
}
}
impl HeapSize for EncodingMask {
fn heap_size(&self) -> usize {
0 }
}
impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for EncodingMask {
fn read_thrift(prot: &mut R) -> Result<Self> {
let mut mask = 0;
let list_ident = prot.read_list_begin()?;
for _ in 0..list_ident.size {
let val = Encoding::read_thrift(prot)?;
mask |= 1 << val as i32;
}
Ok(Self(mask))
}
}
#[allow(deprecated)]
fn i32_to_encoding(val: i32) -> Encoding {
match val {
0 => Encoding::PLAIN,
2 => Encoding::PLAIN_DICTIONARY,
3 => Encoding::RLE,
4 => Encoding::BIT_PACKED,
5 => Encoding::DELTA_BINARY_PACKED,
6 => Encoding::DELTA_LENGTH_BYTE_ARRAY,
7 => Encoding::DELTA_BYTE_ARRAY,
8 => Encoding::RLE_DICTIONARY,
9 => Encoding::BYTE_STREAM_SPLIT,
_ => panic!("Impossible encoding {val}"),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(non_camel_case_types)]
pub enum Compression {
UNCOMPRESSED,
SNAPPY,
GZIP(GzipLevel),
LZO,
BROTLI(BrotliLevel),
LZ4,
ZSTD(ZstdLevel),
LZ4_RAW,
}
impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for Compression {
fn read_thrift(prot: &mut R) -> Result<Self> {
let val = prot.read_i32()?;
Ok(match val {
0 => Self::UNCOMPRESSED,
1 => Self::SNAPPY,
2 => Self::GZIP(Default::default()),
3 => Self::LZO,
4 => Self::BROTLI(Default::default()),
5 => Self::LZ4,
6 => Self::ZSTD(Default::default()),
7 => Self::LZ4_RAW,
_ => return Err(general_err!("Unexpected CompressionCodec {}", val)),
})
}
}
impl WriteThrift for Compression {
const ELEMENT_TYPE: ElementType = ElementType::I32;
fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
let id: i32 = match *self {
Self::UNCOMPRESSED => 0,
Self::SNAPPY => 1,
Self::GZIP(_) => 2,
Self::LZO => 3,
Self::BROTLI(_) => 4,
Self::LZ4 => 5,
Self::ZSTD(_) => 6,
Self::LZ4_RAW => 7,
};
writer.write_i32(id)
}
}
write_thrift_field!(Compression, FieldType::I32);
impl Compression {
pub(crate) fn codec_to_string(self) -> String {
format!("{self:?}").split('(').next().unwrap().to_owned()
}
}
fn split_compression_string(str_setting: &str) -> Result<(&str, Option<u32>), ParquetError> {
let split_setting = str_setting.split_once('(');
match split_setting {
Some((codec, level_str)) => {
let level = &level_str[..level_str.len() - 1]
.parse::<u32>()
.map_err(|_| {
ParquetError::General(format!("invalid compression level: {level_str}"))
})?;
Ok((codec, Some(*level)))
}
None => Ok((str_setting, None)),
}
}
fn check_level_is_none(level: &Option<u32>) -> Result<(), ParquetError> {
if level.is_some() {
return Err(ParquetError::General(
"compression level is not supported".to_string(),
));
}
Ok(())
}
fn require_level(codec: &str, level: Option<u32>) -> Result<u32, ParquetError> {
level.ok_or(ParquetError::General(format!(
"{codec} requires a compression level",
)))
}
impl FromStr for Compression {
type Err = ParquetError;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let (codec, level) = split_compression_string(s)?;
let c = match codec {
"UNCOMPRESSED" | "uncompressed" => {
check_level_is_none(&level)?;
Compression::UNCOMPRESSED
}
"SNAPPY" | "snappy" => {
check_level_is_none(&level)?;
Compression::SNAPPY
}
"GZIP" | "gzip" => {
let level = require_level(codec, level)?;
Compression::GZIP(GzipLevel::try_new(level)?)
}
"LZO" | "lzo" => {
check_level_is_none(&level)?;
Compression::LZO
}
"BROTLI" | "brotli" => {
let level = require_level(codec, level)?;
Compression::BROTLI(BrotliLevel::try_new(level)?)
}
"LZ4" | "lz4" => {
check_level_is_none(&level)?;
Compression::LZ4
}
"ZSTD" | "zstd" => {
let level = require_level(codec, level)?;
Compression::ZSTD(ZstdLevel::try_new(level as i32)?)
}
"LZ4_RAW" | "lz4_raw" => {
check_level_is_none(&level)?;
Compression::LZ4_RAW
}
_ => {
return Err(ParquetError::General(format!(
"unsupport compression {codec}"
)));
}
};
Ok(c)
}
}
thrift_enum!(
enum PageType {
DATA_PAGE = 0;
INDEX_PAGE = 1;
DICTIONARY_PAGE = 2;
DATA_PAGE_V2 = 3;
}
);
thrift_enum!(
enum BoundaryOrder {
UNORDERED = 0;
ASCENDING = 1;
DESCENDING = 2;
}
);
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[repr(i32)]
#[derive(Default)]
pub enum EdgeInterpolationAlgorithm {
#[default]
SPHERICAL = 0,
VINCENTY = 1,
THOMAS = 2,
ANDOYER = 3,
KARNEY = 4,
_Unknown(i32),
}
#[cfg(feature = "geospatial")]
impl EdgeInterpolationAlgorithm {
pub fn try_as_edges(&self) -> Result<parquet_geospatial::WkbEdges> {
match &self {
Self::SPHERICAL => Ok(parquet_geospatial::WkbEdges::Spherical),
Self::VINCENTY => Ok(parquet_geospatial::WkbEdges::Vincenty),
Self::THOMAS => Ok(parquet_geospatial::WkbEdges::Thomas),
Self::ANDOYER => Ok(parquet_geospatial::WkbEdges::Andoyer),
Self::KARNEY => Ok(parquet_geospatial::WkbEdges::Karney),
unknown => Err(general_err!(
"Unknown edge interpolation algorithm: {}",
unknown
)),
}
}
}
impl fmt::Display for EdgeInterpolationAlgorithm {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_fmt(format_args!("{0:?}", self))
}
}
#[cfg(feature = "geospatial")]
impl From<parquet_geospatial::WkbEdges> for EdgeInterpolationAlgorithm {
fn from(value: parquet_geospatial::WkbEdges) -> Self {
match value {
parquet_geospatial::WkbEdges::Spherical => Self::SPHERICAL,
parquet_geospatial::WkbEdges::Vincenty => Self::VINCENTY,
parquet_geospatial::WkbEdges::Thomas => Self::THOMAS,
parquet_geospatial::WkbEdges::Andoyer => Self::ANDOYER,
parquet_geospatial::WkbEdges::Karney => Self::KARNEY,
}
}
}
impl FromStr for EdgeInterpolationAlgorithm {
type Err = ParquetError;
fn from_str(s: &str) -> Result<Self> {
match s.to_ascii_uppercase().as_str() {
"SPHERICAL" => Ok(EdgeInterpolationAlgorithm::SPHERICAL),
"VINCENTY" => Ok(EdgeInterpolationAlgorithm::VINCENTY),
"THOMAS" => Ok(EdgeInterpolationAlgorithm::THOMAS),
"ANDOYER" => Ok(EdgeInterpolationAlgorithm::ANDOYER),
"KARNEY" => Ok(EdgeInterpolationAlgorithm::KARNEY),
unknown => Err(general_err!(
"Unknown edge interpolation algorithm: {}",
unknown
)),
}
}
}
impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for EdgeInterpolationAlgorithm {
fn read_thrift(prot: &mut R) -> Result<Self> {
let val = prot.read_i32()?;
match val {
0 => Ok(Self::SPHERICAL),
1 => Ok(Self::VINCENTY),
2 => Ok(Self::THOMAS),
3 => Ok(Self::ANDOYER),
4 => Ok(Self::KARNEY),
_ => Ok(Self::_Unknown(val)),
}
}
}
impl WriteThrift for EdgeInterpolationAlgorithm {
const ELEMENT_TYPE: ElementType = ElementType::I32;
fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
let val: i32 = match *self {
Self::SPHERICAL => 0,
Self::VINCENTY => 1,
Self::THOMAS => 2,
Self::ANDOYER => 3,
Self::KARNEY => 4,
Self::_Unknown(i) => i,
};
writer.write_i32(val)
}
}
write_thrift_field!(EdgeInterpolationAlgorithm, FieldType::I32);
thrift_union_all_empty!(
union BloomFilterAlgorithm {
1: SplitBlockAlgorithm BLOCK;
}
);
thrift_union_all_empty!(
union BloomFilterHash {
1: XxHash XXHASH;
}
);
thrift_union_all_empty!(
union BloomFilterCompression {
1: Uncompressed UNCOMPRESSED;
}
);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(non_camel_case_types)]
pub enum SortOrder {
SIGNED,
UNSIGNED,
UNDEFINED,
}
impl SortOrder {
pub fn is_signed(&self) -> bool {
matches!(self, Self::SIGNED)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(non_camel_case_types)]
pub enum ColumnOrder {
TYPE_DEFINED_ORDER(SortOrder),
UNDEFINED,
UNKNOWN,
}
impl ColumnOrder {
#[deprecated(
since = "57.1.0",
note = "use `ColumnOrder::sort_order_for_type` instead"
)]
pub fn get_sort_order(
logical_type: Option<LogicalType>,
converted_type: ConvertedType,
physical_type: Type,
) -> SortOrder {
Self::sort_order_for_type(logical_type.as_ref(), converted_type, physical_type)
}
pub fn sort_order_for_type(
logical_type: Option<&LogicalType>,
converted_type: ConvertedType,
physical_type: Type,
) -> SortOrder {
match logical_type {
Some(logical) => match logical {
LogicalType::String | LogicalType::Enum | LogicalType::Json | LogicalType::Bson => {
SortOrder::UNSIGNED
}
LogicalType::Integer { is_signed, .. } => match is_signed {
true => SortOrder::SIGNED,
false => SortOrder::UNSIGNED,
},
LogicalType::Map | LogicalType::List => SortOrder::UNDEFINED,
LogicalType::Decimal { .. } => SortOrder::SIGNED,
LogicalType::Date => SortOrder::SIGNED,
LogicalType::Time { .. } => SortOrder::SIGNED,
LogicalType::Timestamp { .. } => SortOrder::SIGNED,
LogicalType::Unknown => SortOrder::UNDEFINED,
LogicalType::Uuid => SortOrder::UNSIGNED,
LogicalType::Float16 => SortOrder::SIGNED,
LogicalType::Variant { .. }
| LogicalType::Geometry { .. }
| LogicalType::Geography { .. }
| LogicalType::_Unknown { .. } => SortOrder::UNDEFINED,
},
None => Self::get_converted_sort_order(converted_type, physical_type),
}
}
fn get_converted_sort_order(converted_type: ConvertedType, physical_type: Type) -> SortOrder {
match converted_type {
ConvertedType::UTF8
| ConvertedType::JSON
| ConvertedType::BSON
| ConvertedType::ENUM => SortOrder::UNSIGNED,
ConvertedType::INT_8
| ConvertedType::INT_16
| ConvertedType::INT_32
| ConvertedType::INT_64 => SortOrder::SIGNED,
ConvertedType::UINT_8
| ConvertedType::UINT_16
| ConvertedType::UINT_32
| ConvertedType::UINT_64 => SortOrder::UNSIGNED,
ConvertedType::DECIMAL => SortOrder::SIGNED,
ConvertedType::DATE => SortOrder::SIGNED,
ConvertedType::TIME_MILLIS
| ConvertedType::TIME_MICROS
| ConvertedType::TIMESTAMP_MILLIS
| ConvertedType::TIMESTAMP_MICROS => SortOrder::SIGNED,
ConvertedType::INTERVAL => SortOrder::UNDEFINED,
ConvertedType::LIST | ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
SortOrder::UNDEFINED
}
ConvertedType::NONE => Self::get_default_sort_order(physical_type),
}
}
fn get_default_sort_order(physical_type: Type) -> SortOrder {
match physical_type {
Type::BOOLEAN => SortOrder::UNSIGNED,
Type::INT32 | Type::INT64 => SortOrder::SIGNED,
Type::INT96 => SortOrder::UNDEFINED,
Type::FLOAT | Type::DOUBLE => SortOrder::SIGNED,
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => SortOrder::UNSIGNED,
}
}
pub fn sort_order(&self) -> SortOrder {
match *self {
ColumnOrder::TYPE_DEFINED_ORDER(order) => order,
ColumnOrder::UNDEFINED => SortOrder::SIGNED,
ColumnOrder::UNKNOWN => SortOrder::UNDEFINED,
}
}
}
impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ColumnOrder {
fn read_thrift(prot: &mut R) -> Result<Self> {
let field_ident = prot.read_field_begin(0)?;
if field_ident.field_type == FieldType::Stop {
return Err(general_err!("Received empty union from remote ColumnOrder"));
}
let ret = match field_ident.id {
1 => {
prot.skip_empty_struct()?;
Self::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
}
_ => {
prot.skip(field_ident.field_type)?;
Self::UNKNOWN
}
};
let field_ident = prot.read_field_begin(field_ident.id)?;
if field_ident.field_type != FieldType::Stop {
return Err(general_err!(
"Received multiple fields for union from remote ColumnOrder"
));
}
Ok(ret)
}
}
impl WriteThrift for ColumnOrder {
const ELEMENT_TYPE: ElementType = ElementType::Struct;
fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
match *self {
Self::TYPE_DEFINED_ORDER(_) => {
writer.write_field_begin(FieldType::Struct, 1, 0)?;
writer.write_struct_end()?;
}
_ => return Err(general_err!("Attempt to write undefined ColumnOrder")),
}
writer.write_struct_end()
}
}
impl fmt::Display for Compression {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{self:?}")
}
}
impl fmt::Display for SortOrder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{self:?}")
}
}
impl fmt::Display for ColumnOrder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{self:?}")
}
}
impl From<Option<LogicalType>> for ConvertedType {
fn from(value: Option<LogicalType>) -> Self {
match value {
Some(value) => match value {
LogicalType::String => ConvertedType::UTF8,
LogicalType::Map => ConvertedType::MAP,
LogicalType::List => ConvertedType::LIST,
LogicalType::Enum => ConvertedType::ENUM,
LogicalType::Decimal { .. } => ConvertedType::DECIMAL,
LogicalType::Date => ConvertedType::DATE,
LogicalType::Time { unit, .. } => match unit {
TimeUnit::MILLIS => ConvertedType::TIME_MILLIS,
TimeUnit::MICROS => ConvertedType::TIME_MICROS,
TimeUnit::NANOS => ConvertedType::NONE,
},
LogicalType::Timestamp { unit, .. } => match unit {
TimeUnit::MILLIS => ConvertedType::TIMESTAMP_MILLIS,
TimeUnit::MICROS => ConvertedType::TIMESTAMP_MICROS,
TimeUnit::NANOS => ConvertedType::NONE,
},
LogicalType::Integer {
bit_width,
is_signed,
} => match (bit_width, is_signed) {
(8, true) => ConvertedType::INT_8,
(16, true) => ConvertedType::INT_16,
(32, true) => ConvertedType::INT_32,
(64, true) => ConvertedType::INT_64,
(8, false) => ConvertedType::UINT_8,
(16, false) => ConvertedType::UINT_16,
(32, false) => ConvertedType::UINT_32,
(64, false) => ConvertedType::UINT_64,
(bit_width, is_signed) => panic!(
"Integer type bit_width={bit_width}, signed={is_signed} is not supported"
),
},
LogicalType::Json => ConvertedType::JSON,
LogicalType::Bson => ConvertedType::BSON,
LogicalType::Uuid
| LogicalType::Float16
| LogicalType::Variant { .. }
| LogicalType::Geometry { .. }
| LogicalType::Geography { .. }
| LogicalType::_Unknown { .. }
| LogicalType::Unknown => ConvertedType::NONE,
},
None => ConvertedType::NONE,
}
}
}
impl str::FromStr for Repetition {
type Err = ParquetError;
fn from_str(s: &str) -> Result<Self> {
match s {
"REQUIRED" => Ok(Repetition::REQUIRED),
"OPTIONAL" => Ok(Repetition::OPTIONAL),
"REPEATED" => Ok(Repetition::REPEATED),
other => Err(general_err!("Invalid parquet repetition {}", other)),
}
}
}
impl str::FromStr for Type {
type Err = ParquetError;
fn from_str(s: &str) -> Result<Self> {
match s {
"BOOLEAN" => Ok(Type::BOOLEAN),
"INT32" => Ok(Type::INT32),
"INT64" => Ok(Type::INT64),
"INT96" => Ok(Type::INT96),
"FLOAT" => Ok(Type::FLOAT),
"DOUBLE" => Ok(Type::DOUBLE),
"BYTE_ARRAY" | "BINARY" => Ok(Type::BYTE_ARRAY),
"FIXED_LEN_BYTE_ARRAY" => Ok(Type::FIXED_LEN_BYTE_ARRAY),
other => Err(general_err!("Invalid parquet type {}", other)),
}
}
}
impl str::FromStr for ConvertedType {
type Err = ParquetError;
fn from_str(s: &str) -> Result<Self> {
match s {
"NONE" => Ok(ConvertedType::NONE),
"UTF8" => Ok(ConvertedType::UTF8),
"MAP" => Ok(ConvertedType::MAP),
"MAP_KEY_VALUE" => Ok(ConvertedType::MAP_KEY_VALUE),
"LIST" => Ok(ConvertedType::LIST),
"ENUM" => Ok(ConvertedType::ENUM),
"DECIMAL" => Ok(ConvertedType::DECIMAL),
"DATE" => Ok(ConvertedType::DATE),
"TIME_MILLIS" => Ok(ConvertedType::TIME_MILLIS),
"TIME_MICROS" => Ok(ConvertedType::TIME_MICROS),
"TIMESTAMP_MILLIS" => Ok(ConvertedType::TIMESTAMP_MILLIS),
"TIMESTAMP_MICROS" => Ok(ConvertedType::TIMESTAMP_MICROS),
"UINT_8" => Ok(ConvertedType::UINT_8),
"UINT_16" => Ok(ConvertedType::UINT_16),
"UINT_32" => Ok(ConvertedType::UINT_32),
"UINT_64" => Ok(ConvertedType::UINT_64),
"INT_8" => Ok(ConvertedType::INT_8),
"INT_16" => Ok(ConvertedType::INT_16),
"INT_32" => Ok(ConvertedType::INT_32),
"INT_64" => Ok(ConvertedType::INT_64),
"JSON" => Ok(ConvertedType::JSON),
"BSON" => Ok(ConvertedType::BSON),
"INTERVAL" => Ok(ConvertedType::INTERVAL),
other => Err(general_err!("Invalid parquet converted type {}", other)),
}
}
}
impl str::FromStr for LogicalType {
type Err = ParquetError;
fn from_str(s: &str) -> Result<Self> {
match s {
"INTEGER" => Ok(LogicalType::Integer {
bit_width: 8,
is_signed: false,
}),
"MAP" => Ok(LogicalType::Map),
"LIST" => Ok(LogicalType::List),
"ENUM" => Ok(LogicalType::Enum),
"DECIMAL" => Ok(LogicalType::Decimal {
precision: -1,
scale: -1,
}),
"DATE" => Ok(LogicalType::Date),
"TIME" => Ok(LogicalType::Time {
is_adjusted_to_u_t_c: false,
unit: TimeUnit::MILLIS,
}),
"TIMESTAMP" => Ok(LogicalType::Timestamp {
is_adjusted_to_u_t_c: false,
unit: TimeUnit::MILLIS,
}),
"STRING" => Ok(LogicalType::String),
"JSON" => Ok(LogicalType::Json),
"BSON" => Ok(LogicalType::Bson),
"UUID" => Ok(LogicalType::Uuid),
"UNKNOWN" => Ok(LogicalType::Unknown),
"INTERVAL" => Err(general_err!(
"Interval parquet logical type not yet supported"
)),
"FLOAT16" => Ok(LogicalType::Float16),
"GEOMETRY" => Ok(LogicalType::Geometry { crs: None }),
"GEOGRAPHY" => Ok(LogicalType::Geography {
crs: None,
algorithm: Some(EdgeInterpolationAlgorithm::SPHERICAL),
}),
other => Err(general_err!("Invalid parquet logical type {}", other)),
}
}
}
#[cfg(test)]
#[allow(deprecated)] mod tests {
use super::*;
use crate::parquet_thrift::{ThriftSliceInputProtocol, tests::test_roundtrip};
#[test]
fn test_display_type() {
assert_eq!(Type::BOOLEAN.to_string(), "BOOLEAN");
assert_eq!(Type::INT32.to_string(), "INT32");
assert_eq!(Type::INT64.to_string(), "INT64");
assert_eq!(Type::INT96.to_string(), "INT96");
assert_eq!(Type::FLOAT.to_string(), "FLOAT");
assert_eq!(Type::DOUBLE.to_string(), "DOUBLE");
assert_eq!(Type::BYTE_ARRAY.to_string(), "BYTE_ARRAY");
assert_eq!(
Type::FIXED_LEN_BYTE_ARRAY.to_string(),
"FIXED_LEN_BYTE_ARRAY"
);
}
#[test]
fn test_from_string_into_type() {
assert_eq!(
Type::BOOLEAN.to_string().parse::<Type>().unwrap(),
Type::BOOLEAN
);
assert_eq!(
Type::INT32.to_string().parse::<Type>().unwrap(),
Type::INT32
);
assert_eq!(
Type::INT64.to_string().parse::<Type>().unwrap(),
Type::INT64
);
assert_eq!(
Type::INT96.to_string().parse::<Type>().unwrap(),
Type::INT96
);
assert_eq!(
Type::FLOAT.to_string().parse::<Type>().unwrap(),
Type::FLOAT
);
assert_eq!(
Type::DOUBLE.to_string().parse::<Type>().unwrap(),
Type::DOUBLE
);
assert_eq!(
Type::BYTE_ARRAY.to_string().parse::<Type>().unwrap(),
Type::BYTE_ARRAY
);
assert_eq!("BINARY".parse::<Type>().unwrap(), Type::BYTE_ARRAY);
assert_eq!(
Type::FIXED_LEN_BYTE_ARRAY
.to_string()
.parse::<Type>()
.unwrap(),
Type::FIXED_LEN_BYTE_ARRAY
);
}
#[test]
fn test_converted_type_roundtrip() {
test_roundtrip(ConvertedType::UTF8);
test_roundtrip(ConvertedType::MAP);
test_roundtrip(ConvertedType::MAP_KEY_VALUE);
test_roundtrip(ConvertedType::LIST);
test_roundtrip(ConvertedType::ENUM);
test_roundtrip(ConvertedType::DECIMAL);
test_roundtrip(ConvertedType::DATE);
test_roundtrip(ConvertedType::TIME_MILLIS);
test_roundtrip(ConvertedType::TIME_MICROS);
test_roundtrip(ConvertedType::TIMESTAMP_MILLIS);
test_roundtrip(ConvertedType::TIMESTAMP_MICROS);
test_roundtrip(ConvertedType::UINT_8);
test_roundtrip(ConvertedType::UINT_16);
test_roundtrip(ConvertedType::UINT_32);
test_roundtrip(ConvertedType::UINT_64);
test_roundtrip(ConvertedType::INT_8);
test_roundtrip(ConvertedType::INT_16);
test_roundtrip(ConvertedType::INT_32);
test_roundtrip(ConvertedType::INT_64);
test_roundtrip(ConvertedType::JSON);
test_roundtrip(ConvertedType::BSON);
test_roundtrip(ConvertedType::INTERVAL);
}
#[test]
fn test_read_invalid_converted_type() {
let mut prot = ThriftSliceInputProtocol::new(&[0x7eu8]);
let res = ConvertedType::read_thrift(&mut prot);
assert!(res.is_err());
assert_eq!(
res.unwrap_err().to_string(),
"Parquet error: Unexpected ConvertedType 63"
);
}
#[test]
fn test_display_converted_type() {
assert_eq!(ConvertedType::NONE.to_string(), "NONE");
assert_eq!(ConvertedType::UTF8.to_string(), "UTF8");
assert_eq!(ConvertedType::MAP.to_string(), "MAP");
assert_eq!(ConvertedType::MAP_KEY_VALUE.to_string(), "MAP_KEY_VALUE");
assert_eq!(ConvertedType::LIST.to_string(), "LIST");
assert_eq!(ConvertedType::ENUM.to_string(), "ENUM");
assert_eq!(ConvertedType::DECIMAL.to_string(), "DECIMAL");
assert_eq!(ConvertedType::DATE.to_string(), "DATE");
assert_eq!(ConvertedType::TIME_MILLIS.to_string(), "TIME_MILLIS");
assert_eq!(ConvertedType::DATE.to_string(), "DATE");
assert_eq!(ConvertedType::TIME_MICROS.to_string(), "TIME_MICROS");
assert_eq!(
ConvertedType::TIMESTAMP_MILLIS.to_string(),
"TIMESTAMP_MILLIS"
);
assert_eq!(
ConvertedType::TIMESTAMP_MICROS.to_string(),
"TIMESTAMP_MICROS"
);
assert_eq!(ConvertedType::UINT_8.to_string(), "UINT_8");
assert_eq!(ConvertedType::UINT_16.to_string(), "UINT_16");
assert_eq!(ConvertedType::UINT_32.to_string(), "UINT_32");
assert_eq!(ConvertedType::UINT_64.to_string(), "UINT_64");
assert_eq!(ConvertedType::INT_8.to_string(), "INT_8");
assert_eq!(ConvertedType::INT_16.to_string(), "INT_16");
assert_eq!(ConvertedType::INT_32.to_string(), "INT_32");
assert_eq!(ConvertedType::INT_64.to_string(), "INT_64");
assert_eq!(ConvertedType::JSON.to_string(), "JSON");
assert_eq!(ConvertedType::BSON.to_string(), "BSON");
assert_eq!(ConvertedType::INTERVAL.to_string(), "INTERVAL");
assert_eq!(ConvertedType::DECIMAL.to_string(), "DECIMAL")
}
#[test]
fn test_from_string_into_converted_type() {
assert_eq!(
ConvertedType::NONE
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::NONE
);
assert_eq!(
ConvertedType::UTF8
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::UTF8
);
assert_eq!(
ConvertedType::MAP
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::MAP
);
assert_eq!(
ConvertedType::MAP_KEY_VALUE
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::MAP_KEY_VALUE
);
assert_eq!(
ConvertedType::LIST
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::LIST
);
assert_eq!(
ConvertedType::ENUM
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::ENUM
);
assert_eq!(
ConvertedType::DECIMAL
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::DECIMAL
);
assert_eq!(
ConvertedType::DATE
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::DATE
);
assert_eq!(
ConvertedType::TIME_MILLIS
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::TIME_MILLIS
);
assert_eq!(
ConvertedType::TIME_MICROS
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::TIME_MICROS
);
assert_eq!(
ConvertedType::TIMESTAMP_MILLIS
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::TIMESTAMP_MILLIS
);
assert_eq!(
ConvertedType::TIMESTAMP_MICROS
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::TIMESTAMP_MICROS
);
assert_eq!(
ConvertedType::UINT_8
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::UINT_8
);
assert_eq!(
ConvertedType::UINT_16
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::UINT_16
);
assert_eq!(
ConvertedType::UINT_32
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::UINT_32
);
assert_eq!(
ConvertedType::UINT_64
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::UINT_64
);
assert_eq!(
ConvertedType::INT_8
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::INT_8
);
assert_eq!(
ConvertedType::INT_16
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::INT_16
);
assert_eq!(
ConvertedType::INT_32
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::INT_32
);
assert_eq!(
ConvertedType::INT_64
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::INT_64
);
assert_eq!(
ConvertedType::JSON
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::JSON
);
assert_eq!(
ConvertedType::BSON
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::BSON
);
assert_eq!(
ConvertedType::INTERVAL
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::INTERVAL
);
assert_eq!(
ConvertedType::DECIMAL
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::DECIMAL
)
}
#[test]
fn test_logical_to_converted_type() {
let logical_none: Option<LogicalType> = None;
assert_eq!(ConvertedType::from(logical_none), ConvertedType::NONE);
assert_eq!(
ConvertedType::from(Some(LogicalType::Decimal {
precision: 20,
scale: 5
})),
ConvertedType::DECIMAL
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Bson)),
ConvertedType::BSON
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Json)),
ConvertedType::JSON
);
assert_eq!(
ConvertedType::from(Some(LogicalType::String)),
ConvertedType::UTF8
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Date)),
ConvertedType::DATE
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Time {
unit: TimeUnit::MILLIS,
is_adjusted_to_u_t_c: true,
})),
ConvertedType::TIME_MILLIS
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Time {
unit: TimeUnit::MICROS,
is_adjusted_to_u_t_c: true,
})),
ConvertedType::TIME_MICROS
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Time {
unit: TimeUnit::NANOS,
is_adjusted_to_u_t_c: false,
})),
ConvertedType::NONE
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Timestamp {
unit: TimeUnit::MILLIS,
is_adjusted_to_u_t_c: true,
})),
ConvertedType::TIMESTAMP_MILLIS
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Timestamp {
unit: TimeUnit::MICROS,
is_adjusted_to_u_t_c: false,
})),
ConvertedType::TIMESTAMP_MICROS
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Timestamp {
unit: TimeUnit::NANOS,
is_adjusted_to_u_t_c: false,
})),
ConvertedType::NONE
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Integer {
bit_width: 8,
is_signed: false
})),
ConvertedType::UINT_8
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Integer {
bit_width: 8,
is_signed: true
})),
ConvertedType::INT_8
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Integer {
bit_width: 16,
is_signed: false
})),
ConvertedType::UINT_16
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Integer {
bit_width: 16,
is_signed: true
})),
ConvertedType::INT_16
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Integer {
bit_width: 32,
is_signed: false
})),
ConvertedType::UINT_32
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Integer {
bit_width: 32,
is_signed: true
})),
ConvertedType::INT_32
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Integer {
bit_width: 64,
is_signed: false
})),
ConvertedType::UINT_64
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Integer {
bit_width: 64,
is_signed: true
})),
ConvertedType::INT_64
);
assert_eq!(
ConvertedType::from(Some(LogicalType::List)),
ConvertedType::LIST
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Map)),
ConvertedType::MAP
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Uuid)),
ConvertedType::NONE
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Enum)),
ConvertedType::ENUM
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Float16)),
ConvertedType::NONE
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Geometry { crs: None })),
ConvertedType::NONE
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Geography {
crs: None,
algorithm: Some(EdgeInterpolationAlgorithm::default()),
})),
ConvertedType::NONE
);
assert_eq!(
ConvertedType::from(Some(LogicalType::Unknown)),
ConvertedType::NONE
);
}
#[test]
fn test_logical_type_roundtrip() {
test_roundtrip(LogicalType::String);
test_roundtrip(LogicalType::Map);
test_roundtrip(LogicalType::List);
test_roundtrip(LogicalType::Enum);
test_roundtrip(LogicalType::Decimal {
scale: 0,
precision: 20,
});
test_roundtrip(LogicalType::Date);
test_roundtrip(LogicalType::Time {
is_adjusted_to_u_t_c: true,
unit: TimeUnit::MICROS,
});
test_roundtrip(LogicalType::Time {
is_adjusted_to_u_t_c: false,
unit: TimeUnit::MILLIS,
});
test_roundtrip(LogicalType::Time {
is_adjusted_to_u_t_c: false,
unit: TimeUnit::NANOS,
});
test_roundtrip(LogicalType::Timestamp {
is_adjusted_to_u_t_c: false,
unit: TimeUnit::MICROS,
});
test_roundtrip(LogicalType::Timestamp {
is_adjusted_to_u_t_c: true,
unit: TimeUnit::MILLIS,
});
test_roundtrip(LogicalType::Timestamp {
is_adjusted_to_u_t_c: true,
unit: TimeUnit::NANOS,
});
test_roundtrip(LogicalType::Integer {
bit_width: 8,
is_signed: true,
});
test_roundtrip(LogicalType::Integer {
bit_width: 16,
is_signed: false,
});
test_roundtrip(LogicalType::Integer {
bit_width: 32,
is_signed: true,
});
test_roundtrip(LogicalType::Integer {
bit_width: 64,
is_signed: false,
});
test_roundtrip(LogicalType::Json);
test_roundtrip(LogicalType::Bson);
test_roundtrip(LogicalType::Uuid);
test_roundtrip(LogicalType::Float16);
test_roundtrip(LogicalType::Variant {
specification_version: Some(1),
});
test_roundtrip(LogicalType::Variant {
specification_version: None,
});
test_roundtrip(LogicalType::Geometry {
crs: Some("foo".to_owned()),
});
test_roundtrip(LogicalType::Geometry { crs: None });
test_roundtrip(LogicalType::Geography {
crs: Some("foo".to_owned()),
algorithm: Some(EdgeInterpolationAlgorithm::ANDOYER),
});
test_roundtrip(LogicalType::Geography {
crs: None,
algorithm: Some(EdgeInterpolationAlgorithm::KARNEY),
});
test_roundtrip(LogicalType::Geography {
crs: Some("foo".to_owned()),
algorithm: Some(EdgeInterpolationAlgorithm::SPHERICAL),
});
test_roundtrip(LogicalType::Geography {
crs: None,
algorithm: Some(EdgeInterpolationAlgorithm::SPHERICAL),
});
}
#[test]
fn test_display_repetition() {
assert_eq!(Repetition::REQUIRED.to_string(), "REQUIRED");
assert_eq!(Repetition::OPTIONAL.to_string(), "OPTIONAL");
assert_eq!(Repetition::REPEATED.to_string(), "REPEATED");
}
#[test]
fn test_from_string_into_repetition() {
assert_eq!(
Repetition::REQUIRED
.to_string()
.parse::<Repetition>()
.unwrap(),
Repetition::REQUIRED
);
assert_eq!(
Repetition::OPTIONAL
.to_string()
.parse::<Repetition>()
.unwrap(),
Repetition::OPTIONAL
);
assert_eq!(
Repetition::REPEATED
.to_string()
.parse::<Repetition>()
.unwrap(),
Repetition::REPEATED
);
}
#[test]
fn test_display_encoding() {
assert_eq!(Encoding::PLAIN.to_string(), "PLAIN");
assert_eq!(Encoding::PLAIN_DICTIONARY.to_string(), "PLAIN_DICTIONARY");
assert_eq!(Encoding::RLE.to_string(), "RLE");
assert_eq!(Encoding::BIT_PACKED.to_string(), "BIT_PACKED");
assert_eq!(
Encoding::DELTA_BINARY_PACKED.to_string(),
"DELTA_BINARY_PACKED"
);
assert_eq!(
Encoding::DELTA_LENGTH_BYTE_ARRAY.to_string(),
"DELTA_LENGTH_BYTE_ARRAY"
);
assert_eq!(Encoding::DELTA_BYTE_ARRAY.to_string(), "DELTA_BYTE_ARRAY");
assert_eq!(Encoding::RLE_DICTIONARY.to_string(), "RLE_DICTIONARY");
}
#[test]
fn test_compression_codec_to_string() {
assert_eq!(Compression::UNCOMPRESSED.codec_to_string(), "UNCOMPRESSED");
assert_eq!(
Compression::ZSTD(ZstdLevel::default()).codec_to_string(),
"ZSTD"
);
}
#[test]
fn test_display_compression() {
assert_eq!(Compression::UNCOMPRESSED.to_string(), "UNCOMPRESSED");
assert_eq!(Compression::SNAPPY.to_string(), "SNAPPY");
assert_eq!(
Compression::GZIP(Default::default()).to_string(),
"GZIP(GzipLevel(6))"
);
assert_eq!(Compression::LZO.to_string(), "LZO");
assert_eq!(
Compression::BROTLI(Default::default()).to_string(),
"BROTLI(BrotliLevel(1))"
);
assert_eq!(Compression::LZ4.to_string(), "LZ4");
assert_eq!(
Compression::ZSTD(Default::default()).to_string(),
"ZSTD(ZstdLevel(1))"
);
}
#[test]
fn test_display_page_type() {
assert_eq!(PageType::DATA_PAGE.to_string(), "DATA_PAGE");
assert_eq!(PageType::INDEX_PAGE.to_string(), "INDEX_PAGE");
assert_eq!(PageType::DICTIONARY_PAGE.to_string(), "DICTIONARY_PAGE");
assert_eq!(PageType::DATA_PAGE_V2.to_string(), "DATA_PAGE_V2");
}
#[test]
fn test_display_sort_order() {
assert_eq!(SortOrder::SIGNED.to_string(), "SIGNED");
assert_eq!(SortOrder::UNSIGNED.to_string(), "UNSIGNED");
assert_eq!(SortOrder::UNDEFINED.to_string(), "UNDEFINED");
}
#[test]
fn test_display_column_order() {
assert_eq!(
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED).to_string(),
"TYPE_DEFINED_ORDER(SIGNED)"
);
assert_eq!(
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED).to_string(),
"TYPE_DEFINED_ORDER(UNSIGNED)"
);
assert_eq!(
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED).to_string(),
"TYPE_DEFINED_ORDER(UNDEFINED)"
);
assert_eq!(ColumnOrder::UNDEFINED.to_string(), "UNDEFINED");
}
#[test]
fn test_column_order_roundtrip() {
test_roundtrip(ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED))
}
#[test]
fn test_column_order_get_logical_type_sort_order() {
fn check_sort_order(types: Vec<LogicalType>, expected_order: SortOrder) {
for tpe in types {
assert_eq!(
ColumnOrder::get_sort_order(Some(tpe), ConvertedType::NONE, Type::BYTE_ARRAY),
expected_order
);
}
}
let unsigned = vec![
LogicalType::String,
LogicalType::Json,
LogicalType::Bson,
LogicalType::Enum,
LogicalType::Uuid,
LogicalType::Integer {
bit_width: 8,
is_signed: false,
},
LogicalType::Integer {
bit_width: 16,
is_signed: false,
},
LogicalType::Integer {
bit_width: 32,
is_signed: false,
},
LogicalType::Integer {
bit_width: 64,
is_signed: false,
},
];
check_sort_order(unsigned, SortOrder::UNSIGNED);
let signed = vec![
LogicalType::Integer {
bit_width: 8,
is_signed: true,
},
LogicalType::Integer {
bit_width: 8,
is_signed: true,
},
LogicalType::Integer {
bit_width: 8,
is_signed: true,
},
LogicalType::Integer {
bit_width: 8,
is_signed: true,
},
LogicalType::Decimal {
scale: 20,
precision: 4,
},
LogicalType::Date,
LogicalType::Time {
is_adjusted_to_u_t_c: false,
unit: TimeUnit::MILLIS,
},
LogicalType::Time {
is_adjusted_to_u_t_c: false,
unit: TimeUnit::MICROS,
},
LogicalType::Time {
is_adjusted_to_u_t_c: true,
unit: TimeUnit::NANOS,
},
LogicalType::Timestamp {
is_adjusted_to_u_t_c: false,
unit: TimeUnit::MILLIS,
},
LogicalType::Timestamp {
is_adjusted_to_u_t_c: false,
unit: TimeUnit::MICROS,
},
LogicalType::Timestamp {
is_adjusted_to_u_t_c: true,
unit: TimeUnit::NANOS,
},
LogicalType::Float16,
];
check_sort_order(signed, SortOrder::SIGNED);
let undefined = vec![
LogicalType::List,
LogicalType::Map,
LogicalType::Geometry { crs: None },
LogicalType::Geography {
crs: None,
algorithm: Some(EdgeInterpolationAlgorithm::default()),
},
];
check_sort_order(undefined, SortOrder::UNDEFINED);
}
#[test]
fn test_column_order_get_converted_type_sort_order() {
fn check_sort_order(types: Vec<ConvertedType>, expected_order: SortOrder) {
for tpe in types {
assert_eq!(
ColumnOrder::get_sort_order(None, tpe, Type::BYTE_ARRAY),
expected_order
);
}
}
let unsigned = vec![
ConvertedType::UTF8,
ConvertedType::JSON,
ConvertedType::BSON,
ConvertedType::ENUM,
ConvertedType::UINT_8,
ConvertedType::UINT_16,
ConvertedType::UINT_32,
ConvertedType::UINT_64,
];
check_sort_order(unsigned, SortOrder::UNSIGNED);
let signed = vec![
ConvertedType::INT_8,
ConvertedType::INT_16,
ConvertedType::INT_32,
ConvertedType::INT_64,
ConvertedType::DECIMAL,
ConvertedType::DATE,
ConvertedType::TIME_MILLIS,
ConvertedType::TIME_MICROS,
ConvertedType::TIMESTAMP_MILLIS,
ConvertedType::TIMESTAMP_MICROS,
];
check_sort_order(signed, SortOrder::SIGNED);
let undefined = vec![
ConvertedType::LIST,
ConvertedType::MAP,
ConvertedType::MAP_KEY_VALUE,
ConvertedType::INTERVAL,
];
check_sort_order(undefined, SortOrder::UNDEFINED);
check_sort_order(vec![ConvertedType::NONE], SortOrder::UNSIGNED);
}
#[test]
fn test_column_order_get_default_sort_order() {
assert_eq!(
ColumnOrder::get_default_sort_order(Type::BOOLEAN),
SortOrder::UNSIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::INT32),
SortOrder::SIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::INT64),
SortOrder::SIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::INT96),
SortOrder::UNDEFINED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::FLOAT),
SortOrder::SIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::DOUBLE),
SortOrder::SIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::BYTE_ARRAY),
SortOrder::UNSIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::FIXED_LEN_BYTE_ARRAY),
SortOrder::UNSIGNED
);
}
#[test]
fn test_column_order_sort_order() {
assert_eq!(
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED).sort_order(),
SortOrder::SIGNED
);
assert_eq!(
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED).sort_order(),
SortOrder::UNSIGNED
);
assert_eq!(
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED).sort_order(),
SortOrder::UNDEFINED
);
assert_eq!(ColumnOrder::UNDEFINED.sort_order(), SortOrder::SIGNED);
}
#[test]
fn test_parse_encoding() {
let mut encoding: Encoding = "PLAIN".parse().unwrap();
assert_eq!(encoding, Encoding::PLAIN);
encoding = "PLAIN_DICTIONARY".parse().unwrap();
assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
encoding = "RLE".parse().unwrap();
assert_eq!(encoding, Encoding::RLE);
encoding = "BIT_PACKED".parse().unwrap();
assert_eq!(encoding, Encoding::BIT_PACKED);
encoding = "DELTA_BINARY_PACKED".parse().unwrap();
assert_eq!(encoding, Encoding::DELTA_BINARY_PACKED);
encoding = "DELTA_LENGTH_BYTE_ARRAY".parse().unwrap();
assert_eq!(encoding, Encoding::DELTA_LENGTH_BYTE_ARRAY);
encoding = "DELTA_BYTE_ARRAY".parse().unwrap();
assert_eq!(encoding, Encoding::DELTA_BYTE_ARRAY);
encoding = "RLE_DICTIONARY".parse().unwrap();
assert_eq!(encoding, Encoding::RLE_DICTIONARY);
encoding = "BYTE_STREAM_SPLIT".parse().unwrap();
assert_eq!(encoding, Encoding::BYTE_STREAM_SPLIT);
encoding = "byte_stream_split".parse().unwrap();
assert_eq!(encoding, Encoding::BYTE_STREAM_SPLIT);
match "plain_xxx".parse::<Encoding>() {
Ok(e) => {
panic!("Should not be able to parse {e:?}");
}
Err(e) => {
assert_eq!(e.to_string(), "Parquet error: unknown encoding: plain_xxx");
}
}
}
#[test]
fn test_parse_compression() {
let mut compress: Compression = "snappy".parse().unwrap();
assert_eq!(compress, Compression::SNAPPY);
compress = "lzo".parse().unwrap();
assert_eq!(compress, Compression::LZO);
compress = "zstd(3)".parse().unwrap();
assert_eq!(compress, Compression::ZSTD(ZstdLevel::try_new(3).unwrap()));
compress = "LZ4_RAW".parse().unwrap();
assert_eq!(compress, Compression::LZ4_RAW);
compress = "uncompressed".parse().unwrap();
assert_eq!(compress, Compression::UNCOMPRESSED);
compress = "snappy".parse().unwrap();
assert_eq!(compress, Compression::SNAPPY);
compress = "gzip(9)".parse().unwrap();
assert_eq!(compress, Compression::GZIP(GzipLevel::try_new(9).unwrap()));
compress = "lzo".parse().unwrap();
assert_eq!(compress, Compression::LZO);
compress = "brotli(3)".parse().unwrap();
assert_eq!(
compress,
Compression::BROTLI(BrotliLevel::try_new(3).unwrap())
);
compress = "lz4".parse().unwrap();
assert_eq!(compress, Compression::LZ4);
let mut err = "plain_xxx".parse::<Encoding>().unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: unknown encoding: plain_xxx"
);
err = "gzip(-10)".parse::<Encoding>().unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: unknown encoding: gzip(-10)"
);
}
#[test]
fn test_display_boundary_order() {
assert_eq!(BoundaryOrder::ASCENDING.to_string(), "ASCENDING");
assert_eq!(BoundaryOrder::DESCENDING.to_string(), "DESCENDING");
assert_eq!(BoundaryOrder::UNORDERED.to_string(), "UNORDERED");
}
#[test]
fn test_display_edge_algo() {
assert_eq!(
EdgeInterpolationAlgorithm::SPHERICAL.to_string(),
"SPHERICAL"
);
assert_eq!(EdgeInterpolationAlgorithm::VINCENTY.to_string(), "VINCENTY");
assert_eq!(EdgeInterpolationAlgorithm::THOMAS.to_string(), "THOMAS");
assert_eq!(EdgeInterpolationAlgorithm::ANDOYER.to_string(), "ANDOYER");
assert_eq!(EdgeInterpolationAlgorithm::KARNEY.to_string(), "KARNEY");
}
#[test]
fn test_from_str_edge_algo() {
assert_eq!(
"spHErical".parse::<EdgeInterpolationAlgorithm>().unwrap(),
EdgeInterpolationAlgorithm::SPHERICAL
);
assert_eq!(
"vinceNTY".parse::<EdgeInterpolationAlgorithm>().unwrap(),
EdgeInterpolationAlgorithm::VINCENTY
);
assert_eq!(
"tHOmas".parse::<EdgeInterpolationAlgorithm>().unwrap(),
EdgeInterpolationAlgorithm::THOMAS
);
assert_eq!(
"anDOYEr".parse::<EdgeInterpolationAlgorithm>().unwrap(),
EdgeInterpolationAlgorithm::ANDOYER
);
assert_eq!(
"kaRNey".parse::<EdgeInterpolationAlgorithm>().unwrap(),
EdgeInterpolationAlgorithm::KARNEY
);
assert!(
"does not exist"
.parse::<EdgeInterpolationAlgorithm>()
.is_err()
);
}
fn encodings_roundtrip(mut encodings: Vec<Encoding>) {
encodings.sort();
let mask = EncodingMask::new_from_encodings(encodings.iter());
assert!(mask.all_set(encodings.iter()));
let v = mask.encodings().collect::<Vec<_>>();
assert_eq!(v, encodings);
}
#[test]
fn test_encoding_roundtrip() {
encodings_roundtrip(
[
Encoding::RLE,
Encoding::PLAIN,
Encoding::DELTA_BINARY_PACKED,
]
.into(),
);
encodings_roundtrip([Encoding::RLE_DICTIONARY, Encoding::PLAIN_DICTIONARY].into());
encodings_roundtrip([].into());
let encodings = [
Encoding::PLAIN,
Encoding::BIT_PACKED,
Encoding::RLE,
Encoding::DELTA_BINARY_PACKED,
Encoding::DELTA_BYTE_ARRAY,
Encoding::DELTA_LENGTH_BYTE_ARRAY,
Encoding::PLAIN_DICTIONARY,
Encoding::RLE_DICTIONARY,
Encoding::BYTE_STREAM_SPLIT,
];
encodings_roundtrip(encodings.into());
}
#[test]
fn test_invalid_encoding_mask() {
let res = EncodingMask::try_new(-1);
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Attempt to create invalid mask: 0xffffffff"
);
let res = EncodingMask::try_new(2);
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Attempt to create invalid mask: 0x2"
);
}
#[test]
fn test_encoding_mask_is_only() {
let mask = EncodingMask::new_from_encodings([Encoding::PLAIN].iter());
assert!(mask.is_only(Encoding::PLAIN));
let mask =
EncodingMask::new_from_encodings([Encoding::PLAIN, Encoding::PLAIN_DICTIONARY].iter());
assert!(!mask.is_only(Encoding::PLAIN));
}
}