trs-dataframe 0.10.1

Dataframe library for Teiresias
Documentation
use data_value::{DataValue, Extract};

use crate::filter::{
    error::Error, DataInput, Expression, FilterArgument, FilterCombinantion, FilterOperator,
    Filtering, Function,
};

pub fn match_operator(
    i: usize,
    value: &DataValue,
    right: &FilterArgument,
    operator: FilterOperator,
) -> Option<usize> {
    tracing::trace!(
        "Matching operator: {:?} for value: {:?} and right: {:?}",
        operator,
        value,
        right
    );
    match operator {
        FilterOperator::Equal => {
            if value.eq(right.value()) {
                Some(i)
            } else {
                None
            }
        }
        FilterOperator::NotEqual => {
            if !value.eq(right.value()) {
                Some(i)
            } else {
                None
            }
        }
        FilterOperator::Greater => {
            if value.gt(right.value()) {
                Some(i)
            } else {
                None
            }
        }
        FilterOperator::Less => {
            if value.lt(right.value()) {
                Some(i)
            } else {
                None
            }
        }
        FilterOperator::GrOrEq => {
            if value.ge(right.value()) {
                Some(i)
            } else {
                None
            }
        }
        FilterOperator::LeOrEq => {
            if value.le(right.value()) {
                Some(i)
            } else {
                None
            }
        }
        FilterOperator::In => {
            if let Some(vec) = right.vec() {
                if vec.contains(value) {
                    Some(i)
                } else {
                    None
                }
            } else {
                None
            }
        }
        FilterOperator::NotIn => {
            if let Some(vec) = right.vec() {
                if !vec.contains(value) {
                    Some(i)
                } else {
                    None
                }
            } else {
                None
            }
        }
        FilterOperator::Regex => {
            if let Some(pattern) = right.regex() {
                if let DataValue::String(value_str) = value {
                    if pattern.is_match(value_str) {
                        Some(i)
                    } else {
                        None
                    }
                } else {
                    None
                }
            } else {
                None
            }
        }
    }
}

pub fn from_datavalue_to_timestamp_us(data_value: &DataValue) -> Result<DataValue, Error> {
    match data_value {
        DataValue::String(d) => {
            // Attempt to parse the string as a datetime
            match chrono::NaiveDateTime::parse_from_str(d.as_str(), "%Y-%m-%d %H:%M:%S") {
                Ok(dt) => {
                    // Convert NaiveDateTime to DataValue::String - only check
                    Ok(DataValue::from(dt.and_utc().timestamp_micros() as u64))
                }
                Err(_) => {
                    // If parsing fails, return an error or a default value
                    Ok(data_value.clone())
                }
            }
        }
        DataValue::I64(ts) => {
            // Convert timestamp to DateTime
            let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts);
            if let Some(ts) = dt {
                Ok(DataValue::from(ts.timestamp_micros() as u64))
            } else {
                Ok(DataValue::U64(*ts as u64))
            }
        }
        DataValue::U64(ts) => {
            // Convert timestamp to DateTime
            let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts as i64);
            if let Some(ts) = dt {
                Ok(DataValue::from(ts.timestamp_micros() as u64))
            } else {
                Ok(DataValue::U64(*ts))
            }
        }
        DataValue::F64(ts) => {
            // Convert timestamp to DateTime
            let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts as i64);
            if let Some(ts) = dt {
                Ok(DataValue::from(ts.timestamp_micros() as u64))
            } else {
                Ok(DataValue::U64(*ts as u64))
            }
        }
        _ => Ok(data_value.clone()),
    }
}

pub fn apply_function(df: &impl Filtering, expression: &Expression) -> Result<Vec<usize>, Error> {
    df.apply_function(expression)
}

pub fn create_indices_from_expression(
    filtered_df: &impl Filtering,
    expression: &Expression,
) -> Result<Vec<usize>, Error> {
    filtered_df.prepare_indicies(expression)
}

pub fn filter_combination(
    df: &impl Filtering,
    expression: &FilterCombinantion,
) -> Result<Vec<usize>, Error> {
    // this is very naive implementation for now
    // This can be done with more complex logic by cutting indexes
    // based on the previous results, but for now we will just
    // create indices for each expression and then combine them
    // based on the operator
    match expression {
        FilterCombinantion::Simple(expr) => create_indices_from_expression(df, expr),
        FilterCombinantion::And(left, right) => {
            let left_indices = create_indices_from_expression(df, left)?;
            tracing::trace!(
                "AND Left indices: {:?} for expression: {:?}",
                left_indices,
                left
            );
            let right_indices = filter_combination(df, right.as_ref())?;
            tracing::trace!(
                "AND Right indices: {:?} for expression: {:?}",
                right_indices,
                right
            );

            Ok(left_indices
                .into_iter()
                .filter(|i| right_indices.contains(i))
                .collect())
        }
        FilterCombinantion::Or(left, right) => {
            let left_indices = create_indices_from_expression(df, left)?;
            tracing::trace!(
                "OR Left indices: {:?} for expression: {:?}",
                left_indices,
                left
            );
            let right_indices = filter_combination(df, right.as_ref())?;
            tracing::trace!(
                "OR Right indices: {:?} for expression: {:?}",
                right_indices,
                right
            );
            Ok(left_indices.into_iter().chain(right_indices).collect())
        }
        FilterCombinantion::Grouped(expressions) => {
            let mut indices = Vec::new();
            for expr in expressions {
                let expr_indices = filter_combination(df, expr)?;
                indices.extend(expr_indices);
            }
            indices.sort_unstable();
            indices.dedup();
            Ok(indices)
        }
    }
}

pub fn apply_filtering_function(
    index: usize,
    value: &DataValue,
    expression: &Expression,
) -> Option<usize> {
    match &expression.left {
        DataInput::Function(_key, Function::Len) => {
            let right = FilterArgument::Value(expression.right.value());
            match value {
                DataValue::Vec(vec) => match_operator(
                    index,
                    &DataValue::from(vec.len() as u64),
                    &right,
                    expression.operator,
                ),
                DataValue::String(s) => match_operator(
                    index,
                    &DataValue::from(s.len() as u64),
                    &right,
                    expression.operator,
                ),
                _ => {
                    // If the value is not a Vec or String, return a default value
                    None
                }
            }
        }
        DataInput::Function(_key, Function::ToDateTimeUs) => {
            let left = from_datavalue_to_timestamp_us(value).unwrap_or_default();
            let Ok(right) = from_datavalue_to_timestamp_us(&expression.right.value()) else {
                return None;
            };

            match_operator(
                index,
                &left,
                &FilterArgument::Value(right),
                expression.operator,
            )
        }
        DataInput::Mod(_key, modulo) => {
            let right_value = expression.right.value();
            let mod_result = f64::extract(value) % f64::extract(modulo);
            match_operator(
                index,
                &right_value,
                &FilterArgument::Value(mod_result.into()),
                expression.operator,
            )
        }
        _ => {
            let right = FilterArgument::Value(expression.right.value());
            match_operator(index, value, &right, expression.operator)
        }
    }
}