use chrono_tz::Tz;
use combine::{
any,
error::StringStreamError,
many, many1, none_of, optional,
parser::char::{digit, spaces, string},
sep_by1, token, Parser,
};
use crate::{
binary::ReadEx,
errors::Result,
types::{
DateTimeType,
decimal::NoBits,
column::{
array::ArrayColumnData,
column_data::ColumnData,
date::DateColumnData,
datetime64::DateTime64ColumnData,
decimal::DecimalColumnData,
fixed_string::FixedStringColumnData,
ip::{IpColumnData, Ipv4, Ipv6, Uuid},
list::List,
nullable::NullableColumnData,
numeric::VectorColumnData,
string::StringColumnData,
BoxColumnWrapper, ArcColumnWrapper, ColumnWrapper,
enums::{Enum16ColumnData, Enum8ColumnData}
}
},
SqlType,
};
macro_rules! match_str {
($arg:ident, {
$( $($var:literal)|* => $doit:expr,)*
_ => $dothat:block
}) => {
$(
$(
if $arg.eq_ignore_ascii_case($var) {
$doit
} else
)*
)*
$dothat
}
}
impl dyn ColumnData {
#[allow(clippy::cognitive_complexity)]
pub(crate) fn load_data<W: ColumnWrapper, T: ReadEx>(
reader: &mut T,
type_name: &str,
size: usize,
tz: Tz,
) -> Result<W::Wrapper> {
Ok(match_str!(type_name, {
"UInt8" => W::wrap(VectorColumnData::<u8>::load(reader, size)?),
"UInt16" => W::wrap(VectorColumnData::<u16>::load(reader, size)?),
"UInt32" => W::wrap(VectorColumnData::<u32>::load(reader, size)?),
"UInt64" => W::wrap(VectorColumnData::<u64>::load(reader, size)?),
"Int8" | "TinyInt" => W::wrap(VectorColumnData::<i8>::load(reader, size)?),
"Int16" | "SmallInt" => W::wrap(VectorColumnData::<i16>::load(reader, size)?),
"Int32" | "Int" | "Integer" => W::wrap(VectorColumnData::<i32>::load(reader, size)?),
"Int64" | "BigInt" => W::wrap(VectorColumnData::<i64>::load(reader, size)?),
"Float32" | "Float" => W::wrap(VectorColumnData::<f32>::load(reader, size)?),
"Float64" | "Double" => W::wrap(VectorColumnData::<f64>::load(reader, size)?),
"String" | "Char" | "Varchar" | "Text" | "TinyText" | "MediumText" | "LongText" | "Blob" | "TinyBlob" | "MediumBlob" | "LongBlob" => W::wrap(StringColumnData::load(reader, size)?),
"Date" => W::wrap(DateColumnData::<u16>::load(reader, size, tz)?),
"DateTime" | "Timestamp" => W::wrap(DateColumnData::<u32>::load(reader, size, tz)?),
"IPv4" => W::wrap(IpColumnData::<Ipv4>::load(reader, size)?),
"IPv6" => W::wrap(IpColumnData::<Ipv6>::load(reader, size)?),
"UUID" => W::wrap(IpColumnData::<Uuid>::load(reader, size)?),
_ => {
if let Some(inner_type) = parse_nullable_type(type_name) {
W::wrap(NullableColumnData::load(reader, inner_type, size, tz)?)
} else if let Some(str_len) = parse_fixed_string(type_name) {
W::wrap(FixedStringColumnData::load(reader, size, str_len)?)
} else if let Some(inner_type) = parse_array_type(type_name) {
W::wrap(ArrayColumnData::load(reader, inner_type, size, tz)?)
} else if let Some((precision, scale, nobits)) = parse_decimal(type_name) {
W::wrap(DecimalColumnData::load(
reader, precision, scale, nobits, size, tz,
)?)
} else if let Some(items) = parse_enum8(type_name) {
W::wrap(Enum8ColumnData::load(reader, items, size, tz)?)
} else if let Some(items) = parse_enum16(type_name) {
W::wrap(Enum16ColumnData::load(reader, items, size, tz)?)
} else if let Some((precision, timezone)) = parse_date_time64(type_name) {
let column_timezone = get_timezone(&timezone, tz)?;
W::wrap(DateTime64ColumnData::load(reader, size, precision, column_timezone)?)
} else {
let message = format!("Unsupported column type \"{}\".", type_name);
return Err(message.into());
}
}
}))
}
pub(crate) fn from_type<W: ColumnWrapper>(
sql_type: SqlType,
timezone: Tz,
capacity: usize,
) -> Result<W::Wrapper> {
Ok(match sql_type {
SqlType::UInt8 => W::wrap(VectorColumnData::<u8>::with_capacity(capacity)),
SqlType::UInt16 => W::wrap(VectorColumnData::<u16>::with_capacity(capacity)),
SqlType::UInt32 => W::wrap(VectorColumnData::<u32>::with_capacity(capacity)),
SqlType::UInt64 => W::wrap(VectorColumnData::<u64>::with_capacity(capacity)),
SqlType::Int8 => W::wrap(VectorColumnData::<i8>::with_capacity(capacity)),
SqlType::Int16 => W::wrap(VectorColumnData::<i16>::with_capacity(capacity)),
SqlType::Int32 => W::wrap(VectorColumnData::<i32>::with_capacity(capacity)),
SqlType::Int64 => W::wrap(VectorColumnData::<i64>::with_capacity(capacity)),
SqlType::String => W::wrap(StringColumnData::with_capacity(capacity)),
SqlType::FixedString(len) => {
W::wrap(FixedStringColumnData::with_capacity(capacity, len))
}
SqlType::Float32 => W::wrap(VectorColumnData::<f32>::with_capacity(capacity)),
SqlType::Float64 => W::wrap(VectorColumnData::<f64>::with_capacity(capacity)),
SqlType::Ipv4 => W::wrap(IpColumnData::<Ipv4>::with_capacity(capacity)),
SqlType::Ipv6 => W::wrap(IpColumnData::<Ipv6>::with_capacity(capacity)),
SqlType::Uuid => W::wrap(IpColumnData::<Uuid>::with_capacity(capacity)),
SqlType::Date => W::wrap(DateColumnData::<u16>::with_capacity(capacity, timezone)),
SqlType::DateTime(DateTimeType::DateTime64(precision, timezone)) => W::wrap(
DateTime64ColumnData::with_capacity(capacity, precision, timezone),
),
SqlType::DateTime(_) => W::wrap(DateColumnData::<u32>::with_capacity(capacity, timezone)),
SqlType::Nullable(inner_type) => W::wrap(NullableColumnData {
inner: ColumnData::from_type::<ArcColumnWrapper>(inner_type.clone(), timezone, capacity)?,
nulls: Vec::new(),
}),
SqlType::Array(inner_type) => W::wrap(ArrayColumnData {
inner: ColumnData::from_type::<ArcColumnWrapper>(inner_type.clone(), timezone, capacity)?,
offsets: List::with_capacity(capacity),
}),
SqlType::Decimal(precision, scale) => {
let nobits = NoBits::from_precision(precision).unwrap();
let inner_type = match nobits {
NoBits::N32 => SqlType::Int32,
NoBits::N64 => SqlType::Int64,
};
W::wrap(DecimalColumnData {
inner: ColumnData::from_type::<BoxColumnWrapper>(
inner_type, timezone, capacity,
)?,
precision,
scale,
nobits,
})
}
SqlType::Enum8(enum_values) => W::wrap(Enum8ColumnData {
enum_values,
inner: ColumnData::from_type::<BoxColumnWrapper>(
SqlType::Int8,
timezone,
capacity,
)?,
}),
SqlType::Enum16(enum_values) => W::wrap(Enum16ColumnData {
enum_values,
inner: ColumnData::from_type::<BoxColumnWrapper>(
SqlType::Int16,
timezone,
capacity,
)?,
}),
})
}
}
fn parse_fixed_string(source: &str) -> Option<usize> {
if !source.starts_with("FixedString") {
return None;
}
let inner_size = &source[12..source.len() - 1];
match inner_size.parse::<usize>() {
Err(_) => None,
Ok(value) => Some(value),
}
}
fn parse_nullable_type(source: &str) -> Option<&str> {
if !source.starts_with("Nullable") {
return None;
}
let inner_type = &source[9..source.len() - 1];
if inner_type.starts_with("Nullable") {
return None;
}
Some(inner_type)
}
fn parse_array_type(source: &str) -> Option<&str> {
if !source.starts_with("Array") {
return None;
}
let inner_type = &source[6..source.len() - 1];
Some(inner_type)
}
fn parse_decimal(source: &str) -> Option<(u8, u8, NoBits)> {
if source.len() < 12 {
return None;
}
if !source.starts_with("Decimal") {
return None;
}
let mut nobits = None;
let mut precision = None;
let mut scale = None;
let mut params_indexes = (None, None);
for (idx, byte) in source.as_bytes().iter().enumerate() {
if *byte == b'(' {
match &source.as_bytes()[..idx] {
b"Decimal" => {}
b"Decimal32" => {
nobits = Some(NoBits::N32);
}
b"Decimal64" => {
nobits = Some(NoBits::N64);
}
_ => return None,
}
params_indexes.0 = Some(idx);
}
if *byte == b')' {
params_indexes.1 = Some(idx);
}
}
let params_indexes = match params_indexes {
(Some(start), Some(end)) => (start, end),
_ => return None,
};
match nobits {
Some(_) => {
scale = (&source[params_indexes.0 + 1..params_indexes.1])
.parse()
.ok()
}
None => {
for (idx, cell) in
(&source[params_indexes.0 + 1..params_indexes.1])
.split(',')
.map(|s| s.trim())
.enumerate()
{
match idx {
0 => precision = cell.parse().ok(),
1 => scale = cell.parse().ok(),
_ => return None,
}
}
}
}
match (precision, scale, nobits) {
(Some(precision), Some(scale), None) => {
if scale > precision {
return None;
}
if let Some(nobits) = NoBits::from_precision(precision) {
Some((precision, scale, nobits))
} else {
None
}
}
(None, Some(scale), Some(bits)) => {
let precision = match bits {
NoBits::N32 => 9,
NoBits::N64 => 18,
};
Some((precision, scale, bits))
}
_ => None,
}
}
enum EnumSize {
Enum8,
Enum16,
}
fn parse_enum8(input: &str) -> Option<Vec<(String, i8)>> {
match parse_enum(EnumSize::Enum8, input) {
Some(result) => {
let res: Vec<(String, i8)> = result
.iter()
.map(|(key, val)| (key.clone(), *val as i8))
.collect();
Some(res)
}
None => None,
}
}
fn parse_enum16(input: &str) -> Option<Vec<(String, i16)>> {
parse_enum(EnumSize::Enum16, input)
}
fn parse_enum(size: EnumSize, input: &str) -> Option<Vec<(String, i16)>> {
let size = match size {
EnumSize::Enum8 => "Enum8",
EnumSize::Enum16 => "Enum16",
};
let integer = optional(token('-'))
.and(many1::<String, _, _>(digit()))
.and_then(|(x, mut digits)| {
if let Some(x) = x {
digits.insert(0, x);
}
digits
.parse::<i16>()
.map_err(|_| StringStreamError::UnexpectedParse)
});
let word_syms = token('\\').with(any()).or(none_of("'".chars()));
let word = token('\'').with(many(word_syms)).skip(token('\''));
let pair = spaces()
.with(word)
.skip(spaces())
.skip(token('='))
.skip(spaces())
.and(integer)
.skip(spaces());
let enum_body = sep_by1::<Vec<(String, i16)>, _, _, _>(pair, token(','));
let mut parser = spaces()
.with(string(size))
.skip(spaces())
.skip(token('('))
.skip(spaces())
.with(enum_body)
.skip(token(')'));
let result = parser.parse(input);
if let Ok((res, remain)) = result {
if !remain.is_empty() {
return None;
}
Some(res)
} else {
None
}
}
fn parse_date_time64(source: &str) -> Option<(u32, Option<String>)> {
let integer = many1::<String, _, _>(digit()).and_then(|digits| {
digits
.parse::<u32>()
.map_err(|_| StringStreamError::UnexpectedParse)
});
let word_syms = token('\\').with(any()).or(none_of("'".chars()));
let word = token('\'')
.with(many::<String, _, _>(word_syms))
.skip(token('\''));
let timezone = optional(spaces().skip(token(',')).skip(spaces()).with(word));
let pair = spaces()
.with(integer)
.skip(spaces())
.and(timezone)
.skip(spaces());
let mut parser = spaces()
.with(string("DateTime64"))
.skip(spaces())
.skip(token('('))
.skip(spaces())
.with(pair)
.skip(spaces())
.skip(token(')'));
match parser.parse(source) {
Ok((pair, remain)) if remain.is_empty() => Some(pair),
_ => None,
}
}
fn get_timezone(timezone: &Option<String>, tz: Tz) -> Result<Tz> {
match timezone {
None => Ok(tz),
Some(t) => Ok(t.parse()?),
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_parse_decimal() {
assert_eq!(parse_decimal("Decimal(9, 4)"), Some((9, 4, NoBits::N32)));
assert_eq!(parse_decimal("Decimal(10, 4)"), Some((10, 4, NoBits::N64)));
assert_eq!(parse_decimal("Decimal(20, 4)"), None);
assert_eq!(parse_decimal("Decimal(2000, 4)"), None);
assert_eq!(parse_decimal("Decimal(3, 4)"), None);
assert_eq!(parse_decimal("Decimal(20, -4)"), None);
assert_eq!(parse_decimal("Decimal(0)"), None);
assert_eq!(parse_decimal("Decimal(1, 2, 3)"), None);
assert_eq!(parse_decimal("Decimal64(9)"), Some((18, 9, NoBits::N64)));
}
#[test]
fn test_parse_array_type() {
assert_eq!(parse_array_type("Array(UInt8)"), Some("UInt8"));
}
#[test]
fn test_parse_nullable_type() {
assert_eq!(parse_nullable_type("Nullable(Int8)"), Some("Int8"));
assert_eq!(parse_nullable_type("Int8"), None);
assert_eq!(parse_nullable_type("Nullable(Nullable(Int8))"), None);
}
#[test]
fn test_parse_fixed_string() {
assert_eq!(parse_fixed_string("FixedString(8)"), Some(8_usize));
assert_eq!(parse_fixed_string("FixedString(zz)"), None);
assert_eq!(parse_fixed_string("Int8"), None);
}
#[test]
fn test_parse_enum8() {
let enum8 = "Enum8 ('a' = 1, 'b' = 2)";
let res = parse_enum8(enum8).unwrap();
assert_eq!(res, vec![("a".to_owned(), 1), ("b".to_owned(), 2)])
}
#[test]
fn test_parse_enum16_special_chars() {
let enum16 = "Enum16('a_' = -128, 'b&' = 0)";
let res = parse_enum16(enum16).unwrap();
assert_eq!(res, vec![("a_".to_owned(), -128), ("b&".to_owned(), 0)])
}
#[test]
fn test_parse_enum8_single() {
let enum8 = "Enum8 ('a' = 1)";
let res = parse_enum8(enum8).unwrap();
assert_eq!(res, vec![("a".to_owned(), 1)])
}
#[test]
fn test_parse_enum8_empty_id() {
let enum8 = "Enum8 ('' = 1, '' = 2)";
let res = parse_enum8(enum8).unwrap();
assert_eq!(res, vec![("".to_owned(), 1), ("".to_owned(), 2)])
}
#[test]
fn test_parse_enum8_single_empty_id() {
let enum8 = "Enum8 ('' = 1)";
let res = parse_enum8(enum8).unwrap();
assert_eq!(res, vec![("".to_owned(), 1)])
}
#[test]
fn test_parse_enum8_extra_comma() {
let enum8 = "Enum8 ('a' = 1, 'b' = 2,)";
assert!(dbg!(parse_enum8(enum8)).is_none());
}
#[test]
fn test_parse_enum8_empty() {
let enum8 = "Enum8 ()";
assert!(dbg!(parse_enum8(enum8)).is_none());
}
#[test]
fn test_parse_enum8_no_value() {
let enum8 = "Enum8 ('a' =)";
assert!(dbg!(parse_enum8(enum8)).is_none());
}
#[test]
fn test_parse_enum8_no_ident() {
let enum8 = "Enum8 ( = 1)";
assert!(dbg!(parse_enum8(enum8)).is_none());
}
#[test]
fn test_parse_enum8_starting_comma() {
let enum8 = "Enum8 ( , 'a' = 1)";
assert!(dbg!(parse_enum8(enum8)).is_none());
}
#[test]
fn test_parse_enum16() {
let enum16 = "Enum16 ('a' = 1, 'b' = 2)";
let res = parse_enum16(enum16).unwrap();
assert_eq!(res, vec![("a".to_owned(), 1), ("b".to_owned(), 2)])
}
#[test]
fn test_parse_enum16_single() {
let enum16 = "Enum16 ('a' = 1)";
let res = parse_enum16(enum16).unwrap();
assert_eq!(res, vec![("a".to_owned(), 1)])
}
#[test]
fn test_parse_enum16_empty_id() {
let enum16 = "Enum16 ('' = 1, '' = 2)";
let res = parse_enum16(enum16).unwrap();
assert_eq!(res, vec![("".to_owned(), 1), ("".to_owned(), 2)])
}
#[test]
fn test_parse_enum16_single_empty_id() {
let enum16 = "Enum16 ('' = 1)";
let res = parse_enum16(enum16).unwrap();
assert_eq!(res, vec![("".to_owned(), 1)])
}
#[test]
fn test_parse_enum16_extra_comma() {
let enum16 = "Enum16 ('a' = 1, 'b' = 2,)";
assert!(dbg!(parse_enum16(enum16)).is_none());
}
#[test]
fn test_parse_enum16_empty() {
let enum16 = "Enum16 ()";
assert!(dbg!(parse_enum16(enum16)).is_none());
}
#[test]
fn test_parse_enum16_no_value() {
let enum16 = "Enum16 ('a' =)";
assert!(dbg!(parse_enum16(enum16)).is_none());
}
#[test]
fn test_parse_enum16_no_ident() {
let enum16 = "Enum16 ( = 1)";
assert!(dbg!(parse_enum16(enum16)).is_none());
}
#[test]
fn test_parse_enum16_starting_comma() {
let enum16 = "Enum16 ( , 'a' = 1)";
assert!(dbg!(parse_enum16(enum16)).is_none());
}
#[test]
fn test_parse_date_time64() {
let source = " DateTime64 ( 3 , 'Europe/Moscow' )";
let res = parse_date_time64(source).unwrap();
assert_eq!(res, (3, Some("Europe/Moscow".to_string())))
}
#[test]
fn test_parse_date_time64_without_timezone() {
let source = " DateTime64( 5 )";
let res = parse_date_time64(source).unwrap();
assert_eq!(res, (5, None))
}
}