use std::{
collections::{HashMap, HashSet},
fmt::Debug,
num::{ParseFloatError, ParseIntError},
};
use bytes::Bytes;
use chrono::{DateTime, LocalResult, ParseError as ChronoParseError, TimeZone as _, Utc};
use ordered_float::NotNan;
use snafu::{ResultExt, Snafu};
use super::datetime::{TimeZone, datetime_to_utc};
#[cfg(test)]
mod tests;
#[allow(clippy::module_name_repetitions)]
#[derive(Debug, Snafu)]
pub enum ConversionError {
#[snafu(display("Unknown conversion name {:?}", name))]
UnknownConversion { name: String },
}
#[derive(Clone, Debug)]
pub enum Conversion {
Bytes,
Integer,
Float,
Boolean,
Timestamp(TimeZone),
TimestampFmt(String, TimeZone),
TimestampTzFmt(String),
}
#[derive(Debug, Eq, PartialEq, Snafu)]
pub enum Error {
#[snafu(display("Invalid boolean value {:?}", s))]
BoolParse { s: String },
#[snafu(display("Invalid integer {:?}: {}", s, source))]
IntParse { s: String, source: ParseIntError },
#[snafu(display("NaN number not supported {:?}", s))]
NanFloat { s: String },
#[snafu(display("Invalid floating point number {:?}: {}", s, source))]
FloatParse { s: String, source: ParseFloatError },
#[snafu(
display("Invalid timestamp {:?}: {}", s, source),
visibility(pub(super))
)]
TimestampParse { s: String, source: ChronoParseError },
#[snafu(display("No matching timestamp format found for {:?}", s))]
AutoTimestampParse { s: String },
}
#[allow(clippy::implicit_hasher)]
pub fn parse_check_conversion_map(
types: &HashMap<String, String>,
names: &[impl AsRef<str>],
tz: TimeZone,
) -> Result<HashMap<String, Conversion>, ConversionError> {
let names = names
.iter()
.map(std::convert::AsRef::as_ref)
.collect::<HashSet<_>>();
for name in types.keys() {
if !names.contains(name.as_str()) {
tracing::warn!(
message = "Field was specified in the types but is not a valid field name.",
field = &name[..]
);
}
}
parse_conversion_map(types, tz)
}
#[allow(clippy::implicit_hasher)]
pub fn parse_conversion_map(
types: &HashMap<String, String>,
tz: TimeZone,
) -> Result<HashMap<String, Conversion>, ConversionError> {
types
.iter()
.map(|(field, typename)| Conversion::parse(typename, tz).map(|conv| (field.clone(), conv)))
.collect()
}
impl Conversion {
pub fn parse(s: impl AsRef<str>, tz: TimeZone) -> Result<Self, ConversionError> {
let s = s.as_ref();
let mut split = s.splitn(2, '|').map(str::trim);
match (split.next(), split.next()) {
(Some("asis" | "bytes" | "string"), None) => Ok(Self::Bytes),
(Some("integer" | "int"), None) => Ok(Self::Integer),
(Some("float"), None) => Ok(Self::Float),
(Some("bool" | "boolean"), None) => Ok(Self::Boolean),
(Some("timestamp"), None) => Ok(Self::Timestamp(tz)),
(Some("timestamp"), Some(fmt)) => Ok(Self::timestamp(fmt, tz)),
_ => Err(ConversionError::UnknownConversion { name: s.into() }),
}
}
#[must_use]
pub fn timestamp(fmt: &str, tz: TimeZone) -> Self {
if format_has_zone(fmt) {
Self::TimestampTzFmt(fmt.into())
} else {
Self::TimestampFmt(fmt.into(), tz)
}
}
#[allow(clippy::trait_duplication_in_bounds)] pub fn convert<T>(&self, bytes: Bytes) -> Result<T, Error>
where
T: From<Bytes> + From<i64> + From<NotNan<f64>> + From<bool> + From<DateTime<Utc>>,
{
Ok(match self {
Self::Bytes => bytes.into(),
Self::Integer => {
let s = String::from_utf8_lossy(&bytes);
s.parse::<i64>()
.with_context(|_| IntParseSnafu { s })?
.into()
}
Self::Float => {
let s = String::from_utf8_lossy(&bytes);
let parsed = s
.parse::<f64>()
.with_context(|_| FloatParseSnafu { s: s.clone() })?;
let f = NotNan::new(parsed).map_err(|_| Error::NanFloat { s: s.to_string() })?;
f.into()
}
Self::Boolean => parse_bool(&String::from_utf8_lossy(&bytes))?.into(),
Self::Timestamp(tz) => parse_timestamp(*tz, &String::from_utf8_lossy(&bytes))?.into(),
Self::TimestampFmt(format, tz) => {
let s = String::from_utf8_lossy(&bytes);
let dt = tz
.datetime_from_str(&s, format)
.context(TimestampParseSnafu { s })?;
datetime_to_utc(&dt).into()
}
Self::TimestampTzFmt(format) => {
let s = String::from_utf8_lossy(&bytes);
let dt = DateTime::parse_from_str(&s, format)
.with_context(|_| TimestampParseSnafu { s })?;
datetime_to_utc(&dt).into()
}
})
}
}
fn parse_bool(s: &str) -> Result<bool, Error> {
match s {
"true" | "t" | "yes" | "y" => Ok(true),
"false" | "f" | "no" | "n" | "0" => Ok(false),
_ => {
if let Ok(n) = s.parse::<isize>() {
Ok(n != 0)
} else {
match s.to_lowercase().as_str() {
"true" | "t" | "yes" | "y" => Ok(true),
"false" | "f" | "no" | "n" => Ok(false),
_ => Err(Error::BoolParse { s: s.into() }),
}
}
}
}
}
fn format_has_zone(fmt: &str) -> bool {
fmt.contains("%Z")
|| fmt.contains("%z")
|| fmt.contains("%:z")
|| fmt.contains("%#z")
|| fmt.contains("%+")
}
const TIMESTAMP_LOCAL_FORMATS: &[&str] = &[
"%F %T", "%v %T", "%FT%T", "%m/%d/%Y:%T", "%a, %d %b %Y %T", "%a %d %b %T %Y", "%A %d %B %T %Y", "%a %b %e %T %Y", ];
const TIMESTAMP_TZ_FORMATS: &[&str] = &[
"%+", "%a %d %b %T %Z %Y", "%a %d %b %T %z %Y", "%a %d %b %T %#z %Y", "%d/%b/%Y:%T %z", ];
fn parse_unix_timestamp(timestamp_str: &str) -> LocalResult<DateTime<Utc>> {
if let Ok(seconds_since_epoch) = timestamp_str.parse::<i64>() {
Utc.timestamp_opt(seconds_since_epoch, 0)
} else {
LocalResult::None
}
}
fn parse_timestamp(tz: TimeZone, s: &str) -> Result<DateTime<Utc>, Error> {
for format in TIMESTAMP_LOCAL_FORMATS {
if let Ok(result) = tz.datetime_from_str(s, format) {
return Ok(result);
}
}
if let LocalResult::Single(result) = parse_unix_timestamp(s) {
return Ok(result);
}
if let Ok(result) = DateTime::parse_from_rfc3339(s) {
return Ok(datetime_to_utc(&result));
}
if let Ok(result) = DateTime::parse_from_rfc2822(s) {
return Ok(datetime_to_utc(&result));
}
for format in TIMESTAMP_TZ_FORMATS {
if let Ok(result) = DateTime::parse_from_str(s, format) {
return Ok(datetime_to_utc(&result));
}
}
Err(Error::AutoTimestampParse { s: s.into() })
}