use std::any::Any;
use std::sync::Arc;
use datafusion::arrow::array::{
ArrayRef, DurationMicrosecondArray, Int64Array, IntervalDayTimeArray, IntervalYearMonthArray,
};
use datafusion::arrow::datatypes::{DataType, IntervalDayTime, IntervalUnit, TimeUnit};
use datafusion::common::{exec_err, DataFusionError, Result, ScalarValue};
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature,
Volatility,
};
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct ToMillisUdf {
signature: Signature,
}
impl Default for ToMillisUdf {
fn default() -> Self {
Self::new()
}
}
impl ToMillisUdf {
pub fn new() -> Self {
Self {
signature: Signature::new(
TypeSignature::OneOf(vec![
TypeSignature::Exact(vec![DataType::Interval(IntervalUnit::DayTime)]),
TypeSignature::Exact(vec![DataType::Duration(TimeUnit::Microsecond)]),
]),
Volatility::Immutable,
),
}
}
}
impl ScalarUDFImpl for ToMillisUdf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"hamelin_to_millis"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = args.args;
if args.len() != 1 {
return exec_err!("to_millis expects exactly 1 argument, got {}", args.len());
}
match &args[0] {
ColumnarValue::Scalar(scalar) => match scalar {
ScalarValue::IntervalDayTime(opt_interval) => {
let millis = opt_interval.map(|interval| {
(interval.days as i64) * 86_400_000 + (interval.milliseconds as i64)
});
Ok(ColumnarValue::Scalar(ScalarValue::Int64(millis)))
}
ScalarValue::DurationMicrosecond(opt_micros) => {
let millis = opt_micros.map(|micros| micros / 1_000);
Ok(ColumnarValue::Scalar(ScalarValue::Int64(millis)))
}
_ => exec_err!(
"to_millis expects IntervalDayTime or DurationMicrosecond, got {:?}",
scalar
),
},
ColumnarValue::Array(array) => {
if let Some(interval_array) = array.as_any().downcast_ref::<IntervalDayTimeArray>()
{
let result: Int64Array = interval_array
.iter()
.map(|opt| {
opt.map(|v| (v.days as i64) * 86_400_000 + (v.milliseconds as i64))
})
.collect();
Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef))
} else if let Some(duration_array) =
array.as_any().downcast_ref::<DurationMicrosecondArray>()
{
let result: Int64Array = duration_array
.iter()
.map(|opt| opt.map(|micros| micros / 1_000))
.collect();
Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef))
} else {
exec_err!(
"to_millis expects IntervalDayTimeArray or DurationMicrosecondArray, got {:?}",
array.data_type()
)
}
}
}
}
}
pub fn to_millis_udf() -> ScalarUDF {
ScalarUDF::new_from_impl(ToMillisUdf::new())
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct ToNanosUdf {
signature: Signature,
}
impl Default for ToNanosUdf {
fn default() -> Self {
Self::new()
}
}
impl ToNanosUdf {
pub fn new() -> Self {
Self {
signature: Signature::new(
TypeSignature::OneOf(vec![
TypeSignature::Exact(vec![DataType::Interval(IntervalUnit::DayTime)]),
TypeSignature::Exact(vec![DataType::Duration(TimeUnit::Microsecond)]),
]),
Volatility::Immutable,
),
}
}
}
impl ScalarUDFImpl for ToNanosUdf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"hamelin_to_nanos"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = args.args;
if args.len() != 1 {
return exec_err!("to_nanos expects exactly 1 argument, got {}", args.len());
}
match &args[0] {
ColumnarValue::Scalar(scalar) => match scalar {
ScalarValue::IntervalDayTime(opt_interval) => {
let nanos = opt_interval.map(|interval| {
(interval.days as i64) * 86_400_000_000_000
+ (interval.milliseconds as i64) * 1_000_000
});
Ok(ColumnarValue::Scalar(ScalarValue::Int64(nanos)))
}
ScalarValue::DurationMicrosecond(opt_micros) => {
let nanos = opt_micros.map(|micros| micros * 1_000);
Ok(ColumnarValue::Scalar(ScalarValue::Int64(nanos)))
}
_ => exec_err!(
"to_nanos expects IntervalDayTime or DurationMicrosecond, got {:?}",
scalar
),
},
ColumnarValue::Array(array) => {
if let Some(interval_array) = array.as_any().downcast_ref::<IntervalDayTimeArray>()
{
let result: Int64Array = interval_array
.iter()
.map(|opt| {
opt.map(|v| {
(v.days as i64) * 86_400_000_000_000
+ (v.milliseconds as i64) * 1_000_000
})
})
.collect();
Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef))
} else if let Some(duration_array) =
array.as_any().downcast_ref::<DurationMicrosecondArray>()
{
let result: Int64Array = duration_array
.iter()
.map(|opt| opt.map(|micros| micros * 1_000))
.collect();
Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef))
} else {
exec_err!(
"to_nanos expects IntervalDayTimeArray or DurationMicrosecondArray, got {:?}",
array.data_type()
)
}
}
}
}
}
pub fn to_nanos_udf() -> ScalarUDF {
ScalarUDF::new_from_impl(ToNanosUdf::new())
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct FromMillisUdf {
signature: Signature,
}
impl Default for FromMillisUdf {
fn default() -> Self {
Self::new()
}
}
impl FromMillisUdf {
pub fn new() -> Self {
Self {
signature: Signature::exact(vec![DataType::Int64], Volatility::Immutable),
}
}
}
impl ScalarUDFImpl for FromMillisUdf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"hamelin_from_millis"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Interval(IntervalUnit::DayTime))
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = args.args;
if args.len() != 1 {
return exec_err!("from_millis expects exactly 1 argument, got {}", args.len());
}
match &args[0] {
ColumnarValue::Scalar(scalar) => {
if let ScalarValue::Int64(opt_millis) = scalar {
match opt_millis {
Some(millis) => {
let days = (*millis / 86_400_000) as i32;
let remaining_millis = (*millis % 86_400_000) as i32;
let interval = IntervalDayTime::new(days, remaining_millis);
Ok(ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
interval,
))))
}
None => Ok(ColumnarValue::Scalar(ScalarValue::IntervalDayTime(None))),
}
} else {
exec_err!("from_millis expects Int64, got {:?}", scalar)
}
}
ColumnarValue::Array(array) => {
let int_array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
DataFusionError::Execution(format!(
"from_millis expects Int64Array, got {:?}",
array.data_type()
))
})?;
let result: IntervalDayTimeArray = int_array
.iter()
.map(|opt| {
opt.map(|millis| {
let days = (millis / 86_400_000) as i32;
let remaining_millis = (millis % 86_400_000) as i32;
IntervalDayTime::new(days, remaining_millis)
})
})
.collect();
Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef))
}
}
}
}
pub fn from_millis_udf() -> ScalarUDF {
ScalarUDF::new_from_impl(FromMillisUdf::new())
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct FromNanosUdf {
signature: Signature,
}
impl Default for FromNanosUdf {
fn default() -> Self {
Self::new()
}
}
impl FromNanosUdf {
pub fn new() -> Self {
Self {
signature: Signature::exact(vec![DataType::Int64], Volatility::Immutable),
}
}
}
impl ScalarUDFImpl for FromNanosUdf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"hamelin_from_nanos"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Interval(IntervalUnit::DayTime))
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = args.args;
if args.len() != 1 {
return exec_err!("from_nanos expects exactly 1 argument, got {}", args.len());
}
match &args[0] {
ColumnarValue::Scalar(scalar) => {
if let ScalarValue::Int64(opt_nanos) = scalar {
match opt_nanos {
Some(nanos) => {
let millis = *nanos / 1_000_000;
let days = (millis / 86_400_000) as i32;
let remaining_millis = (millis % 86_400_000) as i32;
let interval = IntervalDayTime::new(days, remaining_millis);
Ok(ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
interval,
))))
}
None => Ok(ColumnarValue::Scalar(ScalarValue::IntervalDayTime(None))),
}
} else {
exec_err!("from_nanos expects Int64, got {:?}", scalar)
}
}
ColumnarValue::Array(array) => {
let int_array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
DataFusionError::Execution(format!(
"from_nanos expects Int64Array, got {:?}",
array.data_type()
))
})?;
let result: IntervalDayTimeArray = int_array
.iter()
.map(|opt| {
opt.map(|nanos| {
let millis = nanos / 1_000_000;
let days = (millis / 86_400_000) as i32;
let remaining_millis = (millis % 86_400_000) as i32;
IntervalDayTime::new(days, remaining_millis)
})
})
.collect();
Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef))
}
}
}
}
pub fn from_nanos_udf() -> ScalarUDF {
ScalarUDF::new_from_impl(FromNanosUdf::new())
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct ToMonthsUdf {
signature: Signature,
}
impl Default for ToMonthsUdf {
fn default() -> Self {
Self::new()
}
}
impl ToMonthsUdf {
pub fn new() -> Self {
Self {
signature: Signature::exact(
vec![DataType::Interval(IntervalUnit::YearMonth)],
Volatility::Immutable,
),
}
}
}
impl ScalarUDFImpl for ToMonthsUdf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"hamelin_to_months"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = args.args;
if args.len() != 1 {
return exec_err!("to_months expects exactly 1 argument, got {}", args.len());
}
match &args[0] {
ColumnarValue::Scalar(scalar) => {
if let ScalarValue::IntervalYearMonth(opt_months) = scalar {
match opt_months {
Some(months) => Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(
*months as i64,
)))),
None => Ok(ColumnarValue::Scalar(ScalarValue::Int64(None))),
}
} else {
exec_err!("to_months expects IntervalYearMonth, got {:?}", scalar)
}
}
ColumnarValue::Array(array) => {
let interval_array = array
.as_any()
.downcast_ref::<IntervalYearMonthArray>()
.ok_or_else(|| {
DataFusionError::Execution(format!(
"to_months expects IntervalYearMonthArray, got {:?}",
array.data_type()
))
})?;
let result: Int64Array = interval_array
.iter()
.map(|opt| opt.map(|m| m as i64))
.collect();
Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef))
}
}
}
}
pub fn to_months_udf() -> ScalarUDF {
ScalarUDF::new_from_impl(ToMonthsUdf::new())
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct FromMonthsUdf {
signature: Signature,
}
impl Default for FromMonthsUdf {
fn default() -> Self {
Self::new()
}
}
impl FromMonthsUdf {
pub fn new() -> Self {
Self {
signature: Signature::exact(vec![DataType::Int64], Volatility::Immutable),
}
}
}
impl ScalarUDFImpl for FromMonthsUdf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"hamelin_from_months"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Interval(IntervalUnit::YearMonth))
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = args.args;
if args.len() != 1 {
return exec_err!("from_months expects exactly 1 argument, got {}", args.len());
}
match &args[0] {
ColumnarValue::Scalar(scalar) => {
if let ScalarValue::Int64(opt_months) = scalar {
Ok(ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(
opt_months.map(|m| m as i32),
)))
} else {
exec_err!("from_months expects Int64, got {:?}", scalar)
}
}
ColumnarValue::Array(array) => {
let int_array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
DataFusionError::Execution(format!(
"from_months expects Int64Array, got {:?}",
array.data_type()
))
})?;
let result: IntervalYearMonthArray =
int_array.iter().map(|opt| opt.map(|m| m as i32)).collect();
Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef))
}
}
}
}
pub fn from_months_udf() -> ScalarUDF {
ScalarUDF::new_from_impl(FromMonthsUdf::new())
}