use chrono::offset::LocalResult;
use chrono::{
DateTime, Datelike, Duration, FixedOffset, Months, NaiveDate, NaiveDateTime, Offset, TimeDelta,
TimeZone, Timelike, Utc,
};
use chrono_tz::Tz;
use datafusion::logical_expr::Expr;
use datafusion::logical_expr::expr::InList;
use datafusion::logical_expr::{Between, Operator};
use datafusion::scalar::ScalarValue;
use std::ops::{Add, Sub};
use crate::ts_table_provider::ParsedTz;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum TimePred {
True,
False,
Unknown, NonTime, Cmp { op: Operator, ts: DateTime<Utc> }, And(Box<TimePred>, Box<TimePred>),
Or(Box<TimePred>, Box<TimePred>),
Not(Box<TimePred>),
}
impl TimePred {
pub(crate) fn and(a: TimePred, b: TimePred) -> TimePred {
use TimePred::*;
match (a, b) {
(False, _) | (_, False) => False,
(True, x) | (x, True) => x,
(NonTime, x) | (x, NonTime) => x,
(Unknown, x) | (x, Unknown) => x,
(x, y) => And(Box::new(x), Box::new(y)),
}
}
pub(crate) fn or(a: TimePred, b: TimePred) -> TimePred {
use TimePred::*;
match (a, b) {
(True, _) | (_, True) => True,
(False, x) | (x, False) => x,
(NonTime, _) | (_, NonTime) => Unknown,
(Unknown, x) | (x, Unknown) => match x {
True => True,
_ => Unknown,
},
(x, y) => Or(Box::new(x), Box::new(y)),
}
}
pub(crate) fn not(x: TimePred) -> TimePred {
use TimePred::*;
match x {
True => False,
False => True,
NonTime => Unknown,
Unknown => Unknown,
Not(inner) => *inner,
other => Not(Box::new(other)),
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct UnifiedInterval {
pub(crate) months: i32,
pub(crate) days: i32,
pub(crate) nanos: i64,
}
impl UnifiedInterval {
fn zero() -> Self {
Self {
months: 0,
days: 0,
nanos: 0,
}
}
fn add(self, rhs: Self, sign: i32) -> Self {
Self {
months: self.months.saturating_add(rhs.months.saturating_mul(sign)),
days: self.days.saturating_add(rhs.days.saturating_mul(sign)),
nanos: self
.nanos
.saturating_add(rhs.nanos.saturating_mul(sign as i64)),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum IntervalTruth {
AlwaysTrue,
AlwaysFalse,
MaybeTrue,
}
impl IntervalTruth {
pub(crate) fn and(self, rhs: IntervalTruth) -> IntervalTruth {
use IntervalTruth::*;
match (self, rhs) {
(AlwaysFalse, _) | (_, AlwaysFalse) => AlwaysFalse,
(AlwaysTrue, x) | (x, AlwaysTrue) => x,
_ => MaybeTrue,
}
}
pub(crate) fn or(self, rhs: IntervalTruth) -> IntervalTruth {
use IntervalTruth::*;
match (self, rhs) {
(AlwaysTrue, _) | (_, AlwaysTrue) => AlwaysTrue,
(AlwaysFalse, x) | (x, AlwaysFalse) => x,
_ => MaybeTrue,
}
}
pub(crate) fn not(self) -> IntervalTruth {
use IntervalTruth::*;
match self {
AlwaysTrue => AlwaysFalse,
AlwaysFalse => AlwaysTrue,
MaybeTrue => MaybeTrue,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TruncUnit {
Second,
Minute,
Hour,
Day,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BinStride {
FixedNanos(i64),
Months(i64),
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum TsTransform {
ToUnixtime,
ToDate,
DateTrunc {
unit: TruncUnit,
},
DateBin {
stride: BinStride,
origin: DateTime<Utc>,
},
}
fn unwrap_expr(expr: &Expr) -> &Expr {
match expr {
Expr::Alias(a) => unwrap_expr(&a.expr),
Expr::Cast(c) => unwrap_expr(&c.expr),
Expr::TryCast(c) => unwrap_expr(&c.expr),
other => other,
}
}
fn expr_to_numeric(expr: &Expr) -> Option<f64> {
match unwrap_expr(expr) {
Expr::Literal(v, _) => match v {
ScalarValue::Int64(Some(x)) => Some(*x as f64),
ScalarValue::Int32(Some(x)) => Some(*x as f64),
ScalarValue::UInt64(Some(x)) => Some(*x as f64),
ScalarValue::UInt32(Some(x)) => Some(*x as f64),
ScalarValue::Float64(Some(x)) => Some(*x),
ScalarValue::Float32(Some(x)) => Some(*x as f64),
_ => None,
},
_ => None,
}
}
fn scalar_to_utc_datetime(v: &ScalarValue) -> Option<DateTime<Utc>> {
match v {
ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => {
let dt = DateTime::parse_from_rfc3339(s).ok()?;
Some(dt.with_timezone(&Utc))
}
ScalarValue::TimestampSecond(Some(x), _) => Some(Utc.timestamp_opt(*x, 0).single()?),
ScalarValue::TimestampMillisecond(Some(x), _) => {
Some(Utc.timestamp_millis_opt(*x).single()?)
}
ScalarValue::TimestampMicrosecond(Some(x), _) => {
let secs = x.div_euclid(1_000_000);
let micros = x.rem_euclid(1_000_000) as u32;
Some(Utc.timestamp_opt(secs, micros * 1000).single()?)
}
ScalarValue::TimestampNanosecond(Some(x), _) => {
let secs = x.div_euclid(1_000_000_000);
let nanos = x.rem_euclid(1_000_000_000) as u32;
Some(Utc.timestamp_opt(secs, nanos).single()?)
}
_ => None,
}
}
fn expr_is_ts(expr: &Expr, ts_col: &str) -> bool {
match unwrap_expr(expr) {
Expr::Column(c) => c.name == ts_col,
_ => false,
}
}
pub(crate) fn expr_mentions_ts(expr: &Expr, ts_col: &str) -> bool {
let e = unwrap_expr(expr);
match e {
Expr::Column(c) => c.name == ts_col,
Expr::BinaryExpr(be) => {
expr_mentions_ts(&be.left, ts_col) || expr_mentions_ts(&be.right, ts_col)
}
Expr::Not(e) => expr_mentions_ts(e, ts_col),
Expr::Between(b) => {
expr_mentions_ts(&b.expr, ts_col)
|| expr_mentions_ts(&b.low, ts_col)
|| expr_mentions_ts(&b.high, ts_col)
}
Expr::InList(il) => {
expr_mentions_ts(&il.expr, ts_col)
|| il.list.iter().any(|e| expr_mentions_ts(e, ts_col))
}
Expr::ScalarFunction(sf) => sf.args.iter().any(|x| expr_mentions_ts(x, ts_col)),
_ => false,
}
}
fn parse_date_str(s: &str) -> Option<DateTime<Utc>> {
let d = NaiveDate::parse_from_str(s, "%Y-%m-%d").ok()?;
Some(Utc.from_utc_datetime(&d.and_hms_opt(0, 0, 0)?))
}
fn unix_seconds_to_datetime(secs: f64) -> Option<DateTime<Utc>> {
if !secs.is_finite() {
return None;
}
let whole = secs.trunc() as i64;
let frac = secs - (whole as f64);
let nanos = (frac * 1e9).round() as i64;
let (adj_secs, adj_nanos) = if nanos > 1_000_000_000 {
(whole + 1, nanos - 1_000_000_000)
} else if nanos < 0 {
(whole - 1, nanos + 1_000_000_000)
} else {
(whole, nanos)
};
Utc.timestamp_opt(adj_secs, adj_nanos as u32).single()
}
fn parse_ts_literal(expr: &Expr) -> Option<DateTime<Utc>> {
match unwrap_expr(expr) {
Expr::Literal(v, _) => scalar_to_utc_datetime(v).or_else(|| {
if let ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) = v {
parse_date_str(s)
} else {
None
}
}),
Expr::ScalarFunction(sf) => {
let name = sf.name().to_ascii_lowercase();
let args = &sf.args;
if name == "to_timestamp"
|| name == "to_timestamp_seconds"
|| name == "to_timestamp_millis"
|| name == "to_timestamp_micros"
|| name == "to_timestamp_nanos"
{
if args.len() != 1 {
return None;
}
if let Some(dt) = parse_ts_literal(&args[0]) {
return Some(dt);
}
let n = expr_to_numeric(&args[0])?;
return match name.as_str() {
"to_timestamp" | "to_timestamp_seconds" => unix_seconds_to_datetime(n),
"to_timestamp_millis" => unix_seconds_to_datetime(n / 1_000.0),
"to_timestamp_micros" => unix_seconds_to_datetime(n / 1_000_000.0),
"to_timestamp_nanos" => unix_seconds_to_datetime(n / 1_000_000_000.0),
_ => None,
};
}
None
}
_ => None,
}
}
fn scalar_to_bool(v: &ScalarValue) -> Option<bool> {
match v {
ScalarValue::Boolean(Some(b)) => Some(*b),
_ => None,
}
}
fn interval_from_scalar(v: &ScalarValue) -> Option<UnifiedInterval> {
match v {
ScalarValue::IntervalMonthDayNano(Some(v)) => Some(UnifiedInterval {
months: v.months,
days: v.days,
nanos: v.nanoseconds,
}),
ScalarValue::IntervalDayTime(Some(v)) => Some(UnifiedInterval {
months: 0,
days: v.days,
nanos: (v.milliseconds as i64) * 1_000_000,
}),
ScalarValue::IntervalYearMonth(Some(v)) => Some(UnifiedInterval {
months: *v,
days: 0,
nanos: 0,
}),
_ => None,
}
}
fn extract_ts_with_interval(expr: &Expr, ts_col: &str) -> Option<UnifiedInterval> {
fn walk(expr: &Expr, ts_col: &str) -> Option<(UnifiedInterval, bool)> {
if expr_is_ts(expr, ts_col) {
return Some((UnifiedInterval::zero(), true));
}
if let Expr::BinaryExpr(be) = expr
&& matches!(be.op, Operator::Plus | Operator::Minus)
{
let (left_iv, left_has_ts) = walk(&be.left, ts_col)?;
let (right_iv, right_has_ts) = walk(&be.right, ts_col)?;
if left_has_ts == right_has_ts {
return None;
}
if be.op == Operator::Plus {
if left_has_ts {
return Some((left_iv.add(right_iv, 1), true));
} else {
return Some((right_iv.add(left_iv, 1), true));
}
}
if left_has_ts {
return Some((left_iv.add(right_iv, -1), true));
}
return None;
}
if let Expr::Literal(v, _) = expr
&& let Some(iv) = interval_from_scalar(v)
{
return Some((iv, false));
}
None
}
let (net, has_ts) = walk(expr, ts_col)?;
if has_ts { Some(net) } else { None }
}
pub(crate) fn add_interval(
dt: DateTime<Utc>,
iv: UnifiedInterval,
sign: i32,
) -> Option<DateTime<Utc>> {
use chrono::{Duration, Months};
let mut out = dt;
let months = iv.months.saturating_mul(sign);
if months != 0 {
if months > 0 {
out = out.checked_add_months(Months::new(months as u32))?;
} else {
out = out.checked_sub_months(Months::new((-months) as u32))?;
}
}
let days = iv.days.saturating_mul(sign);
if days != 0 {
out = out.checked_add_signed(Duration::days(days as i64))?;
}
let nanos = iv.nanos.saturating_mul(sign as i64);
if nanos != 0 {
out = out.checked_add_signed(Duration::nanoseconds(nanos))?;
}
Some(out)
}
fn flip_op(op: Operator) -> Option<Operator> {
match op {
Operator::Gt => Some(Operator::Lt),
Operator::GtEq => Some(Operator::LtEq),
Operator::Lt => Some(Operator::Gt),
Operator::LtEq => Some(Operator::GtEq),
Operator::Eq => Some(Operator::Eq),
Operator::NotEq => Some(Operator::NotEq),
_ => None,
}
}
fn scalar_to_string_literal(expr: &Expr) -> Option<String> {
match unwrap_expr(expr) {
Expr::Literal(ScalarValue::Utf8(Some(s)), _) => Some(s.clone()),
Expr::Literal(ScalarValue::LargeUtf8(Some(s)), _) => Some(s.clone()),
_ => None,
}
}
fn parse_trunc_unit(s: &str) -> Option<TruncUnit> {
match s.to_ascii_lowercase().as_str() {
"second" | "seconds" => Some(TruncUnit::Second),
"minute" | "minutes" => Some(TruncUnit::Minute),
"hour" | "hours" => Some(TruncUnit::Hour),
"day" | "days" => Some(TruncUnit::Day),
_ => None,
}
}
fn parse_date_bin_stride(expr: &Expr) -> Option<BinStride> {
match unwrap_expr(expr) {
Expr::Literal(ScalarValue::IntervalDayTime(Some(v)), _) => {
let nanos = (TimeDelta::try_days(v.days as i64)?
+ TimeDelta::try_milliseconds(v.milliseconds as i64)?)
.num_nanoseconds()?;
if nanos == 0 {
None
} else {
Some(BinStride::FixedNanos(nanos))
}
}
Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(v)), _) => {
let months = v.months as i64;
let days = v.days as i64;
let nanos = v.nanoseconds;
if months != 0 {
if days != 0 || nanos != 0 {
None
} else {
Some(BinStride::Months(months))
}
} else {
let nanos = (TimeDelta::try_days(days)? + Duration::nanoseconds(nanos))
.num_nanoseconds()?;
if nanos == 0 {
None
} else {
Some(BinStride::FixedNanos(nanos))
}
}
}
_ => None,
}
}
fn datetime_from_nanos(nanos: i64) -> Option<DateTime<Utc>> {
let secs = nanos.div_euclid(1_000_000_000);
let sub = nanos.rem_euclid(1_000_000_000) as u32;
Utc.timestamp_opt(secs, sub).single()
}
fn datetime_to_nanos(dt: DateTime<Utc>) -> Option<i64> {
dt.timestamp_nanos_opt()
}
fn date_bin_compute_distance(time_diff: i64, stride: i64) -> i64 {
let time_delta = time_diff - (time_diff % stride);
if time_diff < 0 && stride > 1 && time_delta != time_diff {
time_delta - stride
} else {
time_delta
}
}
fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> Option<i64> {
let time_diff = source.checked_sub(origin)?;
let time_delta = date_bin_compute_distance(time_diff, stride_nanos);
origin.checked_add(time_delta)
}
fn add_months_checked(dt: DateTime<Utc>, months: i64) -> Option<DateTime<Utc>> {
if months < 0 {
dt.checked_sub_months(Months::new(months.unsigned_abs() as u32))
} else {
dt.checked_add_months(Months::new(months as u32))
}
}
fn date_bin_months_interval(
stride_months: i64,
source: DateTime<Utc>,
origin: DateTime<Utc>,
) -> Option<DateTime<Utc>> {
let month_diff =
(source.year() - origin.year()) * 12 + source.month() as i32 - origin.month() as i32;
let month_delta = date_bin_compute_distance(month_diff as i64, stride_months);
let mut bin_time = add_months_checked(origin, month_delta)?;
if bin_time > source {
bin_time = add_months_checked(origin, month_delta - stride_months)?;
}
Some(bin_time)
}
fn match_ts_transform(expr: &Expr, ts_col: &str) -> Option<TsTransform> {
let e = unwrap_expr(expr);
if let Expr::ScalarFunction(sf) = e {
let name = sf.name();
if name.eq_ignore_ascii_case("to_unixtime")
&& sf.args.len() == 1
&& expr_is_ts(&sf.args[0], ts_col)
{
return Some(TsTransform::ToUnixtime);
}
if name.eq_ignore_ascii_case("to_date")
&& sf.args.len() == 1
&& expr_is_ts(&sf.args[0], ts_col)
{
return Some(TsTransform::ToDate);
}
if name.eq_ignore_ascii_case("date_trunc") {
if sf.args.len() == 2 {
let unit = scalar_to_string_literal(&sf.args[0])?;
let unit = parse_trunc_unit(&unit)?;
if expr_is_ts(&sf.args[1], ts_col) {
return Some(TsTransform::DateTrunc { unit });
}
}
}
if name.eq_ignore_ascii_case("date_bin") {
if sf.args.len() == 2 || sf.args.len() == 3 {
let stride = parse_date_bin_stride(&sf.args[0])?;
if !expr_is_ts(&sf.args[1], ts_col) {
return None;
}
let origin = if sf.args.len() == 3 {
parse_ts_literal(&sf.args[2])?
} else {
Utc.timestamp_opt(0, 0).single()?
};
return Some(TsTransform::DateBin { stride, origin });
}
}
}
None
}
fn parse_date_literal(expr: &Expr) -> Option<NaiveDate> {
match unwrap_expr(expr) {
Expr::Literal(v, _) => match v {
ScalarValue::Date32(Some(days)) => {
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1)?;
Some(epoch.checked_add_signed(Duration::days(*days as i64))?)
}
ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => {
NaiveDate::parse_from_str(s, "%Y-%m-%d").ok()
}
_ => scalar_to_utc_datetime(v).map(|dt| dt.date_naive()),
},
_ => None,
}
}
fn date_bounds_utc(
date: NaiveDate,
tz: Option<&ParsedTz>,
) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
let tz = tz?;
let start_naive = date.and_hms_opt(0, 0, 0)?;
let end_naive = (date + Duration::days(1)).and_hms_opt(0, 0, 0)?;
match tz {
ParsedTz::Utc => {
let s = Utc.from_utc_datetime(&start_naive);
let e = Utc.from_utc_datetime(&end_naive);
Some((s, e))
}
ParsedTz::Fixed(off) => {
let s = off.from_local_datetime(&start_naive).single()?;
let e = off.from_local_datetime(&end_naive).single()?;
Some((s.with_timezone(&Utc), e.with_timezone(&Utc)))
}
ParsedTz::Olson(tz) => {
let start = tz.from_local_datetime(&start_naive);
let end = tz.from_local_datetime(&end_naive);
let pick = |res| match res {
chrono::LocalResult::Single(x) => Some(vec![x]),
chrono::LocalResult::Ambiguous(a, b) => Some(vec![a, b]),
chrono::LocalResult::None => None,
};
let starts = pick(start)?;
let ends = pick(end)?;
let min_start = starts.iter().min().map(|dt| dt.with_timezone(&Utc))?;
let max_end = ends.iter().max().map(|dt| dt.with_timezone(&Utc))?;
Some((min_start, max_end))
}
}
}
fn floor_to_unit(dt: DateTime<Utc>, unit: TruncUnit) -> Option<(DateTime<Utc>, bool)> {
let ts = dt.timestamp();
let nanos = dt.timestamp_subsec_nanos();
match unit {
TruncUnit::Second => {
let flo = Utc.timestamp_opt(ts, 0).single()?;
Some((flo, nanos == 0))
}
TruncUnit::Minute => {
let flo_secs = (ts.div_euclid(60)) * 60;
let flo = Utc.timestamp_opt(flo_secs, 0).single()?;
let aligned = (ts == flo_secs) && nanos == 0;
Some((flo, aligned))
}
TruncUnit::Hour => {
let flo_secs = (ts.div_euclid(3600)) * 3600;
let flo = Utc.timestamp_opt(flo_secs, 0).single()?;
let aligned = (ts == flo_secs) && nanos == 0;
Some((flo, aligned))
}
TruncUnit::Day => {
let date = dt.date_naive();
let flo = Utc.from_utc_datetime(&date.and_hms_opt(0, 0, 0)?);
let aligned = dt == flo;
Some((flo, aligned))
}
}
}
fn trunc_local_naive(local: NaiveDateTime, unit: TruncUnit) -> Option<NaiveDateTime> {
match unit {
TruncUnit::Second => local.with_nanosecond(0),
TruncUnit::Minute => local.with_nanosecond(0)?.with_second(0),
TruncUnit::Hour => local.with_nanosecond(0)?.with_second(0)?.with_minute(0),
TruncUnit::Day => local
.with_nanosecond(0)?
.with_second(0)?
.with_minute(0)?
.with_hour(0),
}
}
fn floor_to_unit_tz(
dt_utc: DateTime<Utc>,
unit: TruncUnit,
tz: Option<&ParsedTz>,
) -> Option<(DateTime<Utc>, bool)> {
if tz.is_none()
|| matches!(tz, Some(ParsedTz::Utc))
|| matches!(unit, TruncUnit::Second | TruncUnit::Minute)
{
return floor_to_unit(dt_utc, unit);
}
match tz? {
ParsedTz::Fixed(off) => {
let local = dt_utc.with_timezone(off).naive_local();
let truncated = trunc_local_naive(local, unit)?;
let local_dt = off.from_local_datetime(&truncated).single()?;
let flo = local_dt.with_timezone(&Utc);
Some((flo, dt_utc == flo))
}
ParsedTz::Olson(tz) => {
let local = dt_utc.with_timezone(tz).naive_local();
let truncated = trunc_local_naive(local, unit)?;
let adjusted = TimeDelta::try_hours(3)?;
let flo_local = match truncated.and_local_timezone(*tz) {
LocalResult::Single(x) => Some(x),
LocalResult::Ambiguous(a, b) => {
let orig = dt_utc.with_timezone(tz);
if a.offset().fix() == orig.offset().fix() {
Some(a)
} else {
Some(b)
}
}
LocalResult::None => {
truncated
.sub(adjusted)
.and_local_timezone(*tz)
.single()
.map(|v| v.add(adjusted))
}
}?;
let flo = flo_local.with_timezone(&Utc);
Some((flo, dt_utc == flo))
}
ParsedTz::Utc => floor_to_unit(dt_utc, unit),
}
}
fn unit_duration(u: TruncUnit) -> Option<Duration> {
Some(match u {
TruncUnit::Second => Duration::seconds(1),
TruncUnit::Minute => Duration::minutes(1),
TruncUnit::Hour => Duration::hours(1),
TruncUnit::Day => Duration::days(1),
})
}
fn next_boundary_local(local: NaiveDateTime, unit: TruncUnit) -> Option<NaiveDateTime> {
Some(match unit {
TruncUnit::Second => local + Duration::seconds(1),
TruncUnit::Minute => local + Duration::minutes(1),
TruncUnit::Hour => local + Duration::hours(1),
TruncUnit::Day => (local.date() + Duration::days(1)).and_hms_opt(0, 0, 0)?,
})
}
fn local_to_utc(tz: &Tz, local: NaiveDateTime, orig_offset: FixedOffset) -> Option<DateTime<Utc>> {
let adjusted = TimeDelta::try_hours(3)?;
let local_dt = match local.and_local_timezone(*tz) {
LocalResult::Single(x) => Some(x),
LocalResult::Ambiguous(a, b) => {
if a.offset().fix() == orig_offset {
Some(a)
} else {
Some(b)
}
}
LocalResult::None => local
.sub(adjusted)
.and_local_timezone(*tz)
.single()
.map(|v| v.add(adjusted)),
}?;
Some(local_dt.with_timezone(&Utc))
}
fn compile_between(b: &Between, ts_col: &str) -> TimePred {
if !expr_is_ts(&b.expr, ts_col) {
return TimePred::Unknown;
}
let low = match parse_ts_literal(&b.low) {
Some(dt) => dt,
None => return TimePred::Unknown,
};
let high = match parse_ts_literal(&b.high) {
Some(dt) => dt,
None => return TimePred::Unknown,
};
let inner = TimePred::and(
TimePred::Cmp {
op: Operator::GtEq,
ts: low,
},
TimePred::Cmp {
op: Operator::LtEq,
ts: high,
},
);
if b.negated {
TimePred::not(inner)
} else {
inner
}
}
fn compile_in_list(il: &InList, ts_col: &str) -> TimePred {
if !expr_is_ts(&il.expr, ts_col) {
return TimePred::Unknown;
}
let mut dts = Vec::with_capacity(il.list.len());
for e in &il.list {
match parse_ts_literal(e) {
Some(dt) => dts.push(dt),
None => return TimePred::Unknown,
}
}
if dts.is_empty() {
return if il.negated {
TimePred::Unknown
} else {
TimePred::False
};
}
let mut p = TimePred::Cmp {
op: Operator::Eq,
ts: dts[0],
};
for dt in dts.into_iter().skip(1) {
p = TimePred::or(
p,
TimePred::Cmp {
op: Operator::Eq,
ts: dt,
},
);
}
if il.negated { TimePred::not(p) } else { p }
}
fn compile_transform_cmp(
tx: TsTransform,
op: Operator,
other: &Expr,
tz: Option<&ParsedTz>,
) -> Option<TimePred> {
match tx {
TsTransform::ToUnixtime => {
let secs = expr_to_numeric(other)?;
let dt = unix_seconds_to_datetime(secs)?;
Some(TimePred::Cmp { op, ts: dt })
}
TsTransform::ToDate => {
let day0 = parse_date_literal(other)?;
let (start, end) = date_bounds_utc(day0, tz)?;
Some(match op {
Operator::Eq => TimePred::and(
TimePred::Cmp {
op: Operator::GtEq,
ts: start,
},
TimePred::Cmp {
op: Operator::Lt,
ts: end,
},
),
Operator::NotEq => TimePred::or(
TimePred::Cmp {
op: Operator::Lt,
ts: start,
},
TimePred::Cmp {
op: Operator::GtEq,
ts: end,
},
),
Operator::Lt => TimePred::Cmp {
op: Operator::Lt,
ts: start,
},
Operator::LtEq => TimePred::Cmp {
op: Operator::Lt,
ts: end,
},
Operator::Gt => TimePred::Cmp {
op: Operator::GtEq,
ts: end,
},
Operator::GtEq => TimePred::Cmp {
op: Operator::GtEq,
ts: start,
},
_ => return None,
})
}
TsTransform::DateTrunc { unit } => {
let dt = parse_ts_literal(other)?;
let (floor_utc, aligned) = floor_to_unit_tz(dt, unit, tz)?;
let use_tz_bounds = matches!(tz, Some(ParsedTz::Olson(_)))
&& matches!(unit, TruncUnit::Hour | TruncUnit::Day);
let end_utc = if use_tz_bounds {
let tz = match tz {
Some(ParsedTz::Olson(tz)) => tz,
_ => return None,
};
let orig = dt.with_timezone(tz);
let floor_local = trunc_local_naive(orig.naive_local(), unit)?;
let next_local = next_boundary_local(floor_local, unit)?;
local_to_utc(tz, next_local, orig.offset().fix())?
} else {
floor_utc + unit_duration(unit)?
};
Some(match op {
Operator::Eq => {
if !aligned {
TimePred::False
} else {
TimePred::and(
TimePred::Cmp {
op: Operator::GtEq,
ts: floor_utc,
},
TimePred::Cmp {
op: Operator::Lt,
ts: end_utc,
},
)
}
}
Operator::NotEq => {
if !aligned {
TimePred::True
} else {
TimePred::or(
TimePred::Cmp {
op: Operator::Lt,
ts: floor_utc,
},
TimePred::Cmp {
op: Operator::GtEq,
ts: end_utc,
},
)
}
}
Operator::Gt => TimePred::Cmp {
op: Operator::GtEq,
ts: end_utc,
},
Operator::GtEq => TimePred::Cmp {
op: Operator::GtEq,
ts: if aligned { floor_utc } else { end_utc },
},
Operator::Lt => TimePred::Cmp {
op: Operator::Lt,
ts: if aligned { floor_utc } else { end_utc },
},
Operator::LtEq => TimePred::Cmp {
op: Operator::Lt,
ts: end_utc,
},
_ => return None,
})
}
TsTransform::DateBin { stride, origin } => {
let dt = parse_ts_literal(other)?;
let (start, end, aligned) = match stride {
BinStride::FixedNanos(stride_nanos) => {
if stride_nanos <= 0 {
return None;
}
let src_ns = datetime_to_nanos(dt)?;
let origin_ns = datetime_to_nanos(origin)?;
let bin_ns = date_bin_nanos_interval(stride_nanos, src_ns, origin_ns)?;
let start = datetime_from_nanos(bin_ns)?;
let end = datetime_from_nanos(bin_ns.checked_add(stride_nanos)?)?;
let aligned = start == dt;
(start, end, aligned)
}
BinStride::Months(stride_months) => {
if stride_months <= 0 {
return None;
}
let start = date_bin_months_interval(stride_months, dt, origin)?;
let end = add_months_checked(start, stride_months)?;
let aligned = start == dt;
(start, end, aligned)
}
};
Some(match op {
Operator::Eq => {
if !aligned {
TimePred::False
} else {
TimePred::and(
TimePred::Cmp {
op: Operator::GtEq,
ts: start,
},
TimePred::Cmp {
op: Operator::Lt,
ts: end,
},
)
}
}
Operator::NotEq => {
if !aligned {
TimePred::True
} else {
TimePred::or(
TimePred::Cmp {
op: Operator::Lt,
ts: start,
},
TimePred::Cmp {
op: Operator::GtEq,
ts: end,
},
)
}
}
Operator::Gt => TimePred::Cmp {
op: Operator::GtEq,
ts: end,
},
Operator::GtEq => TimePred::Cmp {
op: Operator::GtEq,
ts: if aligned { start } else { end },
},
Operator::Lt => TimePred::Cmp {
op: Operator::Lt,
ts: if aligned { start } else { end },
},
Operator::LtEq => TimePred::Cmp {
op: Operator::Lt,
ts: end,
},
_ => return None,
})
}
}
}
fn compile_time_leaf_from_binary(
left: &Expr,
op: Operator,
right: &Expr,
ts_col: &str,
tz: Option<&ParsedTz>,
) -> TimePred {
if !matches!(
op,
Operator::Gt
| Operator::GtEq
| Operator::Lt
| Operator::LtEq
| Operator::Eq
| Operator::NotEq
) {
return TimePred::Unknown;
}
if expr_is_ts(left, ts_col)
&& let Some(dt) = parse_ts_literal(right)
{
return TimePred::Cmp { op, ts: dt };
}
if expr_is_ts(right, ts_col)
&& let Some(dt) = parse_ts_literal(left)
&& let Some(flop) = flip_op(op)
{
return TimePred::Cmp { op: flop, ts: dt };
}
if let Some(net_iv) = extract_ts_with_interval(left, ts_col)
&& let Some(dt) = parse_ts_literal(right)
{
if let Some(shifted) = add_interval(dt, net_iv, -1) {
return TimePred::Cmp { op, ts: shifted };
}
}
if let Some(net_iv) = extract_ts_with_interval(right, ts_col)
&& let Some(dt) = parse_ts_literal(left)
&& let Some(flop) = flip_op(op)
&& let Some(shifted) = add_interval(dt, net_iv, -1)
{
return TimePred::Cmp {
op: flop,
ts: shifted,
};
}
if let Some(tx) = match_ts_transform(left, ts_col)
&& let Some(tp) = compile_transform_cmp(tx, op, right, tz)
{
return tp;
}
if let Some(tx) = match_ts_transform(right, ts_col)
&& let Some(flop) = flip_op(op)
&& let Some(tp) = compile_transform_cmp(tx, flop, left, tz)
{
return tp;
}
TimePred::Unknown
}
pub(crate) fn compile_time_pred(expr: &Expr, ts_col: &str, tz: Option<&ParsedTz>) -> TimePred {
if !expr_mentions_ts(expr, ts_col) {
return TimePred::NonTime;
}
match expr {
Expr::BinaryExpr(be) => {
if be.op == Operator::And {
return TimePred::and(
compile_time_pred(&be.left, ts_col, tz),
compile_time_pred(&be.right, ts_col, tz),
);
}
if be.op == Operator::Or {
return TimePred::or(
compile_time_pred(&be.left, ts_col, tz),
compile_time_pred(&be.right, ts_col, tz),
);
}
compile_time_leaf_from_binary(&be.left, be.op, &be.right, ts_col, tz)
}
Expr::Not(e) => TimePred::not(compile_time_pred(e, ts_col, tz)),
Expr::Literal(v, _) => match scalar_to_bool(v) {
Some(true) => TimePred::True,
Some(false) => TimePred::False,
None => TimePred::Unknown,
},
Expr::Alias(a) => compile_time_pred(&a.expr, ts_col, tz),
Expr::Cast(c) => compile_time_pred(&c.expr, ts_col, tz),
Expr::TryCast(c) => compile_time_pred(&c.expr, ts_col, tz),
Expr::Between(b) => compile_between(b, ts_col),
Expr::InList(il) => compile_in_list(il, ts_col),
_ => TimePred::Unknown,
}
}