use data_value::{DataValue, Extract};
use crate::filter::{
error::Error, DataInput, Expression, FilterArgument, FilterCombinantion, FilterOperator,
Filtering, Function,
};
pub fn match_operator(
i: usize,
value: &DataValue,
right: &FilterArgument,
operator: FilterOperator,
) -> Option<usize> {
tracing::trace!(
"Matching operator: {:?} for value: {:?} and right: {:?}",
operator,
value,
right
);
match operator {
FilterOperator::Equal => {
if value.eq(right.value()) {
Some(i)
} else {
None
}
}
FilterOperator::NotEqual => {
if !value.eq(right.value()) {
Some(i)
} else {
None
}
}
FilterOperator::Greater => {
if value.gt(right.value()) {
Some(i)
} else {
None
}
}
FilterOperator::Less => {
if value.lt(right.value()) {
Some(i)
} else {
None
}
}
FilterOperator::GrOrEq => {
if value.ge(right.value()) {
Some(i)
} else {
None
}
}
FilterOperator::LeOrEq => {
if value.le(right.value()) {
Some(i)
} else {
None
}
}
FilterOperator::In => {
if let Some(vec) = right.vec() {
if vec.contains(value) {
Some(i)
} else {
None
}
} else {
None
}
}
FilterOperator::NotIn => {
if let Some(vec) = right.vec() {
if !vec.contains(value) {
Some(i)
} else {
None
}
} else {
None
}
}
FilterOperator::Regex => {
if let Some(pattern) = right.regex() {
if let DataValue::String(value_str) = value {
if pattern.is_match(value_str) {
Some(i)
} else {
None
}
} else {
None
}
} else {
None
}
}
}
}
pub fn from_datavalue_to_timestamp_us(data_value: &DataValue) -> Result<DataValue, Error> {
match data_value {
DataValue::String(d) => {
match chrono::NaiveDateTime::parse_from_str(d.as_str(), "%Y-%m-%d %H:%M:%S") {
Ok(dt) => {
Ok(DataValue::from(dt.and_utc().timestamp_micros() as u64))
}
Err(_) => {
Ok(data_value.clone())
}
}
}
DataValue::I64(ts) => {
let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts);
if let Some(ts) = dt {
Ok(DataValue::from(ts.timestamp_micros() as u64))
} else {
Ok(DataValue::U64(*ts as u64))
}
}
DataValue::U64(ts) => {
let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts as i64);
if let Some(ts) = dt {
Ok(DataValue::from(ts.timestamp_micros() as u64))
} else {
Ok(DataValue::U64(*ts))
}
}
DataValue::F64(ts) => {
let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts as i64);
if let Some(ts) = dt {
Ok(DataValue::from(ts.timestamp_micros() as u64))
} else {
Ok(DataValue::U64(*ts as u64))
}
}
_ => Ok(data_value.clone()),
}
}
pub fn apply_function(df: &impl Filtering, expression: &Expression) -> Result<Vec<usize>, Error> {
df.apply_function(expression)
}
pub fn create_indices_from_expression(
filtered_df: &impl Filtering,
expression: &Expression,
) -> Result<Vec<usize>, Error> {
filtered_df.prepare_indicies(expression)
}
pub fn filter_combination(
df: &impl Filtering,
expression: &FilterCombinantion,
) -> Result<Vec<usize>, Error> {
match expression {
FilterCombinantion::Simple(expr) => create_indices_from_expression(df, expr),
FilterCombinantion::And(left, right) => {
let left_indices = create_indices_from_expression(df, left)?;
tracing::trace!(
"AND Left indices: {:?} for expression: {:?}",
left_indices,
left
);
let right_indices = filter_combination(df, right.as_ref())?;
tracing::trace!(
"AND Right indices: {:?} for expression: {:?}",
right_indices,
right
);
Ok(left_indices
.into_iter()
.filter(|i| right_indices.contains(i))
.collect())
}
FilterCombinantion::Or(left, right) => {
let left_indices = create_indices_from_expression(df, left)?;
tracing::trace!(
"OR Left indices: {:?} for expression: {:?}",
left_indices,
left
);
let right_indices = filter_combination(df, right.as_ref())?;
tracing::trace!(
"OR Right indices: {:?} for expression: {:?}",
right_indices,
right
);
Ok(left_indices.into_iter().chain(right_indices).collect())
}
FilterCombinantion::Grouped(expressions) => {
let mut indices = Vec::new();
for expr in expressions {
let expr_indices = filter_combination(df, expr)?;
indices.extend(expr_indices);
}
indices.sort_unstable();
indices.dedup();
Ok(indices)
}
}
}
pub fn apply_filtering_function(
index: usize,
value: &DataValue,
expression: &Expression,
) -> Option<usize> {
match &expression.left {
DataInput::Function(_key, Function::Len) => {
let right = FilterArgument::Value(expression.right.value());
match value {
DataValue::Vec(vec) => match_operator(
index,
&DataValue::from(vec.len() as u64),
&right,
expression.operator,
),
DataValue::String(s) => match_operator(
index,
&DataValue::from(s.len() as u64),
&right,
expression.operator,
),
_ => {
None
}
}
}
DataInput::Function(_key, Function::ToDateTimeUs) => {
let left = from_datavalue_to_timestamp_us(value).unwrap_or_default();
let Ok(right) = from_datavalue_to_timestamp_us(&expression.right.value()) else {
return None;
};
match_operator(
index,
&left,
&FilterArgument::Value(right),
expression.operator,
)
}
DataInput::Mod(_key, modulo) => {
let right_value = expression.right.value();
let mod_result = f64::extract(value) % f64::extract(modulo);
match_operator(
index,
&right_value,
&FilterArgument::Value(mod_result.into()),
expression.operator,
)
}
_ => {
let right = FilterArgument::Value(expression.right.value());
match_operator(index, value, &right, expression.operator)
}
}
}