use chrono::Duration;
use hamelin_eval::value::{RangeValue, Value};
#[derive(Debug, Clone, PartialEq)]
pub enum TimeOffset {
Interval(Duration),
CalendarInterval(i32),
}
#[derive(Debug, Clone, PartialEq)]
pub enum RowBound {
Unbounded,
CurrentRow,
Preceding(u64),
Following(u64),
}
#[derive(Debug, Clone, PartialEq)]
pub enum RangeBound {
Unbounded,
CurrentRow,
Preceding(Value),
Following(Value),
}
#[derive(Debug, Clone, PartialEq)]
pub enum TimeRangeBound {
Unbounded,
CurrentRow,
Preceding(TimeOffset),
Following(TimeOffset),
}
#[derive(Debug, Clone)]
pub enum WindowFrame {
Rows { start: RowBound, end: RowBound },
Range { start: RangeBound, end: RangeBound },
TimeRange {
start: TimeRangeBound,
end: TimeRangeBound,
},
}
impl WindowFrame {
pub fn from_value(value: Value) -> Result<Self, String> {
match value {
Value::Interval(d) => {
let zero = Duration::zero();
let (start, end) = if d < zero {
let abs_duration = -d;
(
TimeRangeBound::Preceding(TimeOffset::Interval(abs_duration)),
TimeRangeBound::CurrentRow,
)
} else if d > zero {
(
TimeRangeBound::CurrentRow,
TimeRangeBound::Following(TimeOffset::Interval(d)),
)
} else {
(TimeRangeBound::CurrentRow, TimeRangeBound::CurrentRow)
};
Ok(WindowFrame::TimeRange { start, end })
}
Value::CalendarInterval(months) => {
let (start, end) = if months < 0 {
(
TimeRangeBound::Preceding(TimeOffset::CalendarInterval(-months)),
TimeRangeBound::CurrentRow,
)
} else if months > 0 {
(
TimeRangeBound::CurrentRow,
TimeRangeBound::Following(TimeOffset::CalendarInterval(months)),
)
} else {
(TimeRangeBound::CurrentRow, TimeRangeBound::CurrentRow)
};
Ok(WindowFrame::TimeRange { start, end })
}
Value::Rows(n) => {
let (start, end) = if n < 0 {
let abs_n = n.unsigned_abs();
(RowBound::Preceding(abs_n), RowBound::CurrentRow)
} else if n > 0 {
(RowBound::CurrentRow, RowBound::Following(n as u64))
} else {
(RowBound::CurrentRow, RowBound::CurrentRow)
};
Ok(WindowFrame::Rows { start, end })
}
Value::Range(rv) => {
let RangeValue { lower, upper } = *rv;
convert_range_value(lower, upper)
}
Value::Int(n) => {
let (start, end) = if n < 0 {
(
RangeBound::Preceding(Value::Int(-n)),
RangeBound::CurrentRow,
)
} else if n > 0 {
(RangeBound::CurrentRow, RangeBound::Following(Value::Int(n)))
} else {
(RangeBound::CurrentRow, RangeBound::CurrentRow)
};
Ok(WindowFrame::Range { start, end })
}
Value::Double(f) => {
let (start, end) = if f < 0.0 {
(
RangeBound::Preceding(Value::Double(-f)),
RangeBound::CurrentRow,
)
} else if f > 0.0 {
(
RangeBound::CurrentRow,
RangeBound::Following(Value::Double(f)),
)
} else {
(RangeBound::CurrentRow, RangeBound::CurrentRow)
};
Ok(WindowFrame::Range { start, end })
}
_ => Err(format!(
"Cannot convert {} to window frame",
value.type_name()
)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FrameType {
Rows,
TimeRange,
Range,
}
fn value_frame_type(v: &Value) -> Result<FrameType, String> {
match v {
Value::Rows(_) => Ok(FrameType::Rows),
Value::Interval(_) | Value::CalendarInterval(_) => Ok(FrameType::TimeRange),
_ => Ok(FrameType::Range),
}
}
fn convert_range_value(lower: Option<Value>, upper: Option<Value>) -> Result<WindowFrame, String> {
let lower_type = lower.as_ref().map(value_frame_type).transpose()?;
let upper_type = upper.as_ref().map(value_frame_type).transpose()?;
let frame_type = match (lower_type, upper_type) {
(Some(t1), Some(t2)) if t1 == t2 => t1,
(Some(t1), Some(t2)) => {
return Err(format!(
"Mismatched frame bound types: {:?} and {:?}",
t1, t2
));
}
(Some(t), None) | (None, Some(t)) => t,
(None, None) => {
FrameType::TimeRange
}
};
match frame_type {
FrameType::Rows => {
let start = match lower {
None => RowBound::Unbounded,
Some(v) => convert_row_bound(v)?,
};
let end = match upper {
None => RowBound::Unbounded,
Some(v) => convert_row_bound(v)?,
};
Ok(WindowFrame::Rows { start, end })
}
FrameType::TimeRange => {
let start = match lower {
None => TimeRangeBound::Unbounded,
Some(v) => convert_time_range_bound(v)?,
};
let end = match upper {
None => TimeRangeBound::Unbounded,
Some(v) => convert_time_range_bound(v)?,
};
Ok(WindowFrame::TimeRange { start, end })
}
FrameType::Range => {
let start = match lower {
None => RangeBound::Unbounded,
Some(v) => convert_generic_range_bound(v)?,
};
let end = match upper {
None => RangeBound::Unbounded,
Some(v) => convert_generic_range_bound(v)?,
};
Ok(WindowFrame::Range { start, end })
}
}
}
fn convert_row_bound(v: Value) -> Result<RowBound, String> {
match v {
Value::Rows(n) => {
if n < 0 {
Ok(RowBound::Preceding(n.unsigned_abs()))
} else if n > 0 {
Ok(RowBound::Following(n as u64))
} else {
Ok(RowBound::Following(0))
}
}
_ => Err(format!("Cannot convert {} to row bound", v.type_name())),
}
}
fn convert_time_range_bound(v: Value) -> Result<TimeRangeBound, String> {
match v {
Value::Interval(d) => {
let zero = Duration::zero();
if d < zero {
Ok(TimeRangeBound::Preceding(TimeOffset::Interval(-d)))
} else if d > zero {
Ok(TimeRangeBound::Following(TimeOffset::Interval(d)))
} else {
Ok(TimeRangeBound::Following(TimeOffset::Interval(
Duration::zero(),
)))
}
}
Value::CalendarInterval(months) => {
if months < 0 {
Ok(TimeRangeBound::Preceding(TimeOffset::CalendarInterval(
-months,
)))
} else {
Ok(TimeRangeBound::Following(TimeOffset::CalendarInterval(
months,
)))
}
}
_ => Err(format!(
"Cannot convert {} to time range bound",
v.type_name()
)),
}
}
fn convert_generic_range_bound(v: Value) -> Result<RangeBound, String> {
match &v {
Value::Int(n) => {
if *n < 0 {
Ok(RangeBound::Preceding(Value::Int(-*n)))
} else if *n > 0 {
Ok(RangeBound::Following(v))
} else {
Ok(RangeBound::Following(Value::Int(0)))
}
}
Value::Double(f) => {
if *f < 0.0 {
Ok(RangeBound::Preceding(Value::Double(-*f)))
} else if *f > 0.0 {
Ok(RangeBound::Following(v))
} else {
Ok(RangeBound::Following(Value::Double(0.0)))
}
}
_ => Err(format!(
"Cannot convert {} to range bound - unsupported type for generic range",
v.type_name()
)),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_negative_interval_to_frame() {
let v = Value::Interval(Duration::hours(-2));
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::TimeRange { start, end } => {
assert!(matches!(
start,
TimeRangeBound::Preceding(TimeOffset::Interval(d)) if d == Duration::hours(2)
));
assert_eq!(end, TimeRangeBound::CurrentRow);
}
_ => panic!("Expected TimeRange frame"),
}
}
#[test]
fn test_positive_interval_to_frame() {
let v = Value::Interval(Duration::hours(1));
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::TimeRange { start, end } => {
assert_eq!(start, TimeRangeBound::CurrentRow);
assert!(matches!(
end,
TimeRangeBound::Following(TimeOffset::Interval(d)) if d == Duration::hours(1)
));
}
_ => panic!("Expected TimeRange frame"),
}
}
#[test]
fn test_zero_interval_to_frame() {
let v = Value::Interval(Duration::zero());
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::TimeRange { start, end } => {
assert_eq!(start, TimeRangeBound::CurrentRow);
assert_eq!(end, TimeRangeBound::CurrentRow);
}
_ => panic!("Expected TimeRange frame"),
}
}
#[test]
fn test_negative_rows_to_frame() {
let v = Value::Rows(-5);
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::Rows { start, end } => {
assert_eq!(start, RowBound::Preceding(5));
assert_eq!(end, RowBound::CurrentRow);
}
_ => panic!("Expected Rows frame"),
}
}
#[test]
fn test_range_interval_to_frame() {
let lower = Value::Interval(Duration::hours(-2));
let upper = Value::Interval(Duration::hours(-1));
let v = Value::Range(Box::new(RangeValue {
lower: Some(lower),
upper: Some(upper),
}));
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::TimeRange { start, end } => {
assert!(matches!(
start,
TimeRangeBound::Preceding(TimeOffset::Interval(d)) if d == Duration::hours(2)
));
assert!(matches!(
end,
TimeRangeBound::Preceding(TimeOffset::Interval(d)) if d == Duration::hours(1)
));
}
_ => panic!("Expected TimeRange frame"),
}
}
#[test]
fn test_unbounded_preceding_to_zero() {
let v = Value::Range(Box::new(RangeValue {
lower: None,
upper: Some(Value::Interval(Duration::zero())),
}));
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::TimeRange { start, end } => {
assert_eq!(start, TimeRangeBound::Unbounded);
assert!(matches!(
end,
TimeRangeBound::Following(TimeOffset::Interval(d)) if d == Duration::zero()
));
}
_ => panic!("Expected TimeRange frame"),
}
}
#[test]
fn test_rows_range() {
let v = Value::Range(Box::new(RangeValue {
lower: Some(Value::Rows(-10)),
upper: Some(Value::Rows(10)),
}));
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::Rows { start, end } => {
assert_eq!(start, RowBound::Preceding(10));
assert_eq!(end, RowBound::Following(10));
}
_ => panic!("Expected Rows frame"),
}
}
}