use std::sync::Arc;
use anyhow::bail;
use chrono::Duration;
use hamelin_eval::value::{RangeValue, Value};
use crate::ir::IRExpression;
fn value_to_ir_expression(value: Value) -> IRExpression {
IRExpression::new(Arc::new(value.into()))
}
#[derive(Debug, Clone, PartialEq)]
pub enum RowBound {
Unbounded,
CurrentRow,
Preceding(u64),
Following(u64),
}
#[derive(Debug, Clone)]
pub enum RangeBound {
Unbounded,
CurrentRow,
Preceding(IRExpression),
Following(IRExpression),
}
#[derive(Debug, Clone)]
pub enum WindowFrame {
Rows { start: RowBound, end: RowBound },
Range { start: RangeBound, end: RangeBound },
}
impl WindowFrame {
pub fn from_value(value: Value) -> anyhow::Result<Self> {
match value {
Value::Interval(d) => {
let zero = Duration::zero();
let (start, end) = if d < zero {
let abs_duration = -d;
(
RangeBound::Preceding(value_to_ir_expression(Value::Interval(
abs_duration,
))),
RangeBound::CurrentRow,
)
} else if d > zero {
(
RangeBound::CurrentRow,
RangeBound::Following(value_to_ir_expression(Value::Interval(d))),
)
} else {
(RangeBound::CurrentRow, RangeBound::CurrentRow)
};
Ok(WindowFrame::Range { start, end })
}
Value::CalendarInterval(months) => {
let (start, end) = if months < 0 {
(
RangeBound::Preceding(value_to_ir_expression(Value::CalendarInterval(
-months,
))),
RangeBound::CurrentRow,
)
} else if months > 0 {
(
RangeBound::CurrentRow,
RangeBound::Following(value_to_ir_expression(Value::CalendarInterval(
months,
))),
)
} else {
(RangeBound::CurrentRow, RangeBound::CurrentRow)
};
Ok(WindowFrame::Range { start, end })
}
Value::Rows(n) => {
let (start, end) = if n < 0 {
let abs_n = n.unsigned_abs();
(RowBound::Preceding(abs_n), RowBound::CurrentRow)
} else if n > 0 {
(RowBound::CurrentRow, RowBound::Following(n as u64))
} else {
(RowBound::CurrentRow, RowBound::CurrentRow)
};
Ok(WindowFrame::Rows { start, end })
}
Value::Range(rv) => {
let RangeValue { lower, upper } = *rv;
convert_range_value(lower, upper)
}
Value::Int(n) => {
let (start, end) = if n < 0 {
(
RangeBound::Preceding(value_to_ir_expression(Value::Int(-n))),
RangeBound::CurrentRow,
)
} else if n > 0 {
(
RangeBound::CurrentRow,
RangeBound::Following(value_to_ir_expression(Value::Int(n))),
)
} else {
(RangeBound::CurrentRow, RangeBound::CurrentRow)
};
Ok(WindowFrame::Range { start, end })
}
Value::Double(f) => {
let (start, end) = if f < 0.0 {
(
RangeBound::Preceding(value_to_ir_expression(Value::Double(-f))),
RangeBound::CurrentRow,
)
} else if f > 0.0 {
(
RangeBound::CurrentRow,
RangeBound::Following(value_to_ir_expression(Value::Double(f))),
)
} else {
(RangeBound::CurrentRow, RangeBound::CurrentRow)
};
Ok(WindowFrame::Range { start, end })
}
_ => bail!("Cannot convert {} to window frame", value.type_name()),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FrameKind {
Rows,
Range,
}
fn value_frame_kind(v: &Value) -> FrameKind {
match v {
Value::Rows(_) => FrameKind::Rows,
_ => FrameKind::Range,
}
}
fn convert_range_value(lower: Option<Value>, upper: Option<Value>) -> anyhow::Result<WindowFrame> {
let lower_kind = lower.as_ref().map(value_frame_kind);
let upper_kind = upper.as_ref().map(value_frame_kind);
let frame_kind = match (lower_kind, upper_kind) {
(Some(t1), Some(t2)) if t1 == t2 => t1,
(Some(t1), Some(t2)) => {
bail!("Mismatched frame bound types: {:?} and {:?}", t1, t2);
}
(Some(t), None) | (None, Some(t)) => t,
(None, None) => FrameKind::Range,
};
match frame_kind {
FrameKind::Rows => {
let start = match lower {
None => RowBound::Unbounded,
Some(v) => convert_row_bound(v)?,
};
let end = match upper {
None => RowBound::Unbounded,
Some(v) => convert_row_bound(v)?,
};
Ok(WindowFrame::Rows { start, end })
}
FrameKind::Range => {
let start = match lower {
None => RangeBound::Unbounded,
Some(v) => convert_range_bound(v)?,
};
let end = match upper {
None => RangeBound::Unbounded,
Some(v) => convert_range_bound(v)?,
};
Ok(WindowFrame::Range { start, end })
}
}
}
fn convert_row_bound(v: Value) -> anyhow::Result<RowBound> {
match v {
Value::Rows(n) => {
if n < 0 {
Ok(RowBound::Preceding(n.unsigned_abs()))
} else if n > 0 {
Ok(RowBound::Following(n as u64))
} else {
Ok(RowBound::CurrentRow)
}
}
_ => bail!("Cannot convert {} to row bound", v.type_name()),
}
}
fn convert_range_bound(v: Value) -> anyhow::Result<RangeBound> {
match &v {
Value::Interval(d) => {
let zero = Duration::zero();
if *d < zero {
Ok(RangeBound::Preceding(value_to_ir_expression(
Value::Interval(-*d),
)))
} else if *d > zero {
Ok(RangeBound::Following(value_to_ir_expression(v)))
} else {
Ok(RangeBound::CurrentRow)
}
}
Value::CalendarInterval(months) => {
if *months < 0 {
Ok(RangeBound::Preceding(value_to_ir_expression(
Value::CalendarInterval(-*months),
)))
} else if *months > 0 {
Ok(RangeBound::Following(value_to_ir_expression(v)))
} else {
Ok(RangeBound::CurrentRow)
}
}
Value::Int(n) => {
if *n < 0 {
Ok(RangeBound::Preceding(value_to_ir_expression(Value::Int(
-*n,
))))
} else if *n > 0 {
Ok(RangeBound::Following(value_to_ir_expression(v)))
} else {
Ok(RangeBound::CurrentRow)
}
}
Value::Double(f) => {
if *f < 0.0 {
Ok(RangeBound::Preceding(value_to_ir_expression(
Value::Double(-*f),
)))
} else if *f > 0.0 {
Ok(RangeBound::Following(value_to_ir_expression(v)))
} else {
Ok(RangeBound::CurrentRow)
}
}
_ => bail!("Cannot convert {} to range bound", v.type_name()),
}
}
#[cfg(test)]
mod tests {
use hamelin_lib::tree::ast::expression::IntervalUnit;
use super::*;
fn assert_interval_expr(expr: &IRExpression, expected_millis: i64) {
match &expr.inner().ast.kind {
hamelin_lib::tree::ast::expression::ExpressionKind::IntervalLiteral(lit) => {
assert_eq!(lit.unit, IntervalUnit::Millisecond);
assert_eq!(lit.value, expected_millis);
}
other => panic!("Expected IntervalLiteral, got {:?}", other),
}
}
fn assert_calendar_interval_expr(expr: &IRExpression, expected_months: i64) {
match &expr.inner().ast.kind {
hamelin_lib::tree::ast::expression::ExpressionKind::IntervalLiteral(lit) => {
assert_eq!(lit.unit, IntervalUnit::Month);
assert_eq!(lit.value, expected_months);
}
other => panic!("Expected IntervalLiteral (month), got {:?}", other),
}
}
fn assert_int_expr(expr: &IRExpression, expected: i64) {
match &expr.inner().ast.kind {
hamelin_lib::tree::ast::expression::ExpressionKind::IntLiteral(lit) => {
assert_eq!(lit.int, expected);
}
other => panic!("Expected IntLiteral, got {:?}", other),
}
}
#[test]
fn test_negative_interval_to_frame() {
let v = Value::Interval(Duration::hours(-2));
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::Range { start, end } => {
match &start {
RangeBound::Preceding(expr) => {
assert_interval_expr(expr, 2 * 3600 * 1_000);
}
other => panic!("Expected Preceding, got {:?}", other),
}
assert!(matches!(end, RangeBound::CurrentRow));
}
_ => panic!("Expected Range frame"),
}
}
#[test]
fn test_positive_interval_to_frame() {
let v = Value::Interval(Duration::hours(1));
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::Range { start, end } => {
assert!(matches!(start, RangeBound::CurrentRow));
match &end {
RangeBound::Following(expr) => {
assert_interval_expr(expr, 3600 * 1_000);
}
other => panic!("Expected Following, got {:?}", other),
}
}
_ => panic!("Expected Range frame"),
}
}
#[test]
fn test_zero_interval_to_frame() {
let v = Value::Interval(Duration::zero());
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::Range { start, end } => {
assert!(matches!(start, RangeBound::CurrentRow));
assert!(matches!(end, RangeBound::CurrentRow));
}
_ => panic!("Expected Range frame"),
}
}
#[test]
fn test_negative_rows_to_frame() {
let v = Value::Rows(-5);
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::Rows { start, end } => {
assert_eq!(start, RowBound::Preceding(5));
assert_eq!(end, RowBound::CurrentRow);
}
_ => panic!("Expected Rows frame"),
}
}
#[test]
fn test_range_interval_to_frame() {
let lower = Value::Interval(Duration::hours(-2));
let upper = Value::Interval(Duration::hours(-1));
let v = Value::Range(Box::new(RangeValue {
lower: Some(lower),
upper: Some(upper),
}));
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::Range { start, end } => {
match &start {
RangeBound::Preceding(expr) => {
assert_interval_expr(expr, 2 * 3600 * 1_000);
}
other => panic!("Expected Preceding, got {:?}", other),
}
match &end {
RangeBound::Preceding(expr) => {
assert_interval_expr(expr, 3600 * 1_000);
}
other => panic!("Expected Preceding, got {:?}", other),
}
}
_ => panic!("Expected Range frame"),
}
}
#[test]
fn test_unbounded_preceding_to_zero() {
let v = Value::Range(Box::new(RangeValue {
lower: None,
upper: Some(Value::Interval(Duration::zero())),
}));
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::Range { start, end } => {
assert!(matches!(start, RangeBound::Unbounded));
assert!(matches!(end, RangeBound::CurrentRow));
}
_ => panic!("Expected Range frame"),
}
}
#[test]
fn test_rows_range() {
let v = Value::Range(Box::new(RangeValue {
lower: Some(Value::Rows(-10)),
upper: Some(Value::Rows(10)),
}));
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::Rows { start, end } => {
assert_eq!(start, RowBound::Preceding(10));
assert_eq!(end, RowBound::Following(10));
}
_ => panic!("Expected Rows frame"),
}
}
#[test]
fn test_negative_int_to_frame() {
let v = Value::Int(-3);
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::Range { start, end } => {
match &start {
RangeBound::Preceding(expr) => assert_int_expr(expr, 3),
other => panic!("Expected Preceding, got {:?}", other),
}
assert!(matches!(end, RangeBound::CurrentRow));
}
_ => panic!("Expected Range frame"),
}
}
#[test]
fn test_calendar_interval_to_frame() {
let v = Value::CalendarInterval(-2);
let frame = WindowFrame::from_value(v).unwrap();
match frame {
WindowFrame::Range { start, end } => {
match &start {
RangeBound::Preceding(expr) => {
assert_calendar_interval_expr(expr, 2);
}
other => panic!("Expected Preceding, got {:?}", other),
}
assert!(matches!(end, RangeBound::CurrentRow));
}
_ => panic!("Expected Range frame"),
}
}
}