trs_dataframe/filter/
filtering.rs

1use data_value::{DataValue, Extract};
2
3use crate::filter::{
4    error::Error, DataInput, Expression, FilterArgument, FilterCombinantion, FilterOperator,
5    Filtering, Function,
6};
7
8pub fn match_operator(
9    i: usize,
10    value: &DataValue,
11    right: &FilterArgument,
12    operator: FilterOperator,
13) -> Option<usize> {
14    tracing::trace!(
15        "Matching operator: {:?} for value: {:?} and right: {:?}",
16        operator,
17        value,
18        right
19    );
20    match operator {
21        FilterOperator::Equal => {
22            if value.eq(right.value()) {
23                Some(i)
24            } else {
25                None
26            }
27        }
28        FilterOperator::NotEqual => {
29            if !value.eq(right.value()) {
30                Some(i)
31            } else {
32                None
33            }
34        }
35        FilterOperator::Greater => {
36            if value.gt(right.value()) {
37                Some(i)
38            } else {
39                None
40            }
41        }
42        FilterOperator::Less => {
43            if value.lt(right.value()) {
44                Some(i)
45            } else {
46                None
47            }
48        }
49        FilterOperator::GrOrEq => {
50            if value.ge(right.value()) {
51                Some(i)
52            } else {
53                None
54            }
55        }
56        FilterOperator::LeOrEq => {
57            if value.le(right.value()) {
58                Some(i)
59            } else {
60                None
61            }
62        }
63        FilterOperator::In => {
64            if let Some(vec) = right.vec() {
65                if vec.contains(value) {
66                    Some(i)
67                } else {
68                    None
69                }
70            } else {
71                None
72            }
73        }
74        FilterOperator::NotIn => {
75            if let Some(vec) = right.vec() {
76                if !vec.contains(value) {
77                    Some(i)
78                } else {
79                    None
80                }
81            } else {
82                None
83            }
84        }
85        FilterOperator::Regex => {
86            if let Some(pattern) = right.regex() {
87                if let DataValue::String(value_str) = value {
88                    if pattern.is_match(value_str) {
89                        Some(i)
90                    } else {
91                        None
92                    }
93                } else {
94                    None
95                }
96            } else {
97                None
98            }
99        }
100    }
101}
102
103pub fn from_datavalue_to_timestamp_us(data_value: &DataValue) -> Result<DataValue, Error> {
104    match data_value {
105        DataValue::String(d) => {
106            // Attempt to parse the string as a datetime
107            match chrono::NaiveDateTime::parse_from_str(d.as_str(), "%Y-%m-%d %H:%M:%S") {
108                Ok(dt) => {
109                    // Convert NaiveDateTime to DataValue::String - only check
110                    Ok(DataValue::from(dt.and_utc().timestamp_micros() as u64))
111                }
112                Err(_) => {
113                    // If parsing fails, return an error or a default value
114                    Ok(data_value.clone())
115                }
116            }
117        }
118        DataValue::I64(ts) => {
119            // Convert timestamp to DateTime
120            let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts);
121            if let Some(ts) = dt {
122                Ok(DataValue::from(ts.timestamp_micros() as u64))
123            } else {
124                Ok(DataValue::U64(*ts as u64))
125            }
126        }
127        DataValue::U64(ts) => {
128            // Convert timestamp to DateTime
129            let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts as i64);
130            if let Some(ts) = dt {
131                Ok(DataValue::from(ts.timestamp_micros() as u64))
132            } else {
133                Ok(DataValue::U64(*ts))
134            }
135        }
136        DataValue::F64(ts) => {
137            // Convert timestamp to DateTime
138            let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts as i64);
139            if let Some(ts) = dt {
140                Ok(DataValue::from(ts.timestamp_micros() as u64))
141            } else {
142                Ok(DataValue::U64(*ts as u64))
143            }
144        }
145        _ => Ok(data_value.clone()),
146    }
147}
148
149pub fn apply_function(df: &impl Filtering, expression: &Expression) -> Result<Vec<usize>, Error> {
150    df.apply_function(expression)
151}
152
153pub fn create_indices_from_expression(
154    filtered_df: &impl Filtering,
155    expression: &Expression,
156) -> Result<Vec<usize>, Error> {
157    filtered_df.prepare_indicies(expression)
158}
159
160pub fn filter_combination(
161    df: &impl Filtering,
162    expression: &FilterCombinantion,
163) -> Result<Vec<usize>, Error> {
164    // this is very naive implementation for now
165    // This can be done with more complex logic by cutting indexes
166    // based on the previous results, but for now we will just
167    // create indices for each expression and then combine them
168    // based on the operator
169    match expression {
170        FilterCombinantion::Simple(expr) => create_indices_from_expression(df, expr),
171        FilterCombinantion::And(left, right) => {
172            let left_indices = create_indices_from_expression(df, left)?;
173            tracing::trace!(
174                "AND Left indices: {:?} for expression: {:?}",
175                left_indices,
176                left
177            );
178            let right_indices = filter_combination(df, right.as_ref())?;
179            tracing::trace!(
180                "AND Right indices: {:?} for expression: {:?}",
181                right_indices,
182                right
183            );
184
185            Ok(left_indices
186                .into_iter()
187                .filter(|i| right_indices.contains(i))
188                .collect())
189        }
190        FilterCombinantion::Or(left, right) => {
191            let left_indices = create_indices_from_expression(df, left)?;
192            tracing::trace!(
193                "OR Left indices: {:?} for expression: {:?}",
194                left_indices,
195                left
196            );
197            let right_indices = filter_combination(df, right.as_ref())?;
198            tracing::trace!(
199                "OR Right indices: {:?} for expression: {:?}",
200                right_indices,
201                right
202            );
203            Ok(left_indices.into_iter().chain(right_indices).collect())
204        }
205        FilterCombinantion::Grouped(expressions) => {
206            let mut indices = Vec::new();
207            for expr in expressions {
208                let expr_indices = filter_combination(df, expr)?;
209                indices.extend(expr_indices);
210            }
211            indices.sort_unstable();
212            indices.dedup();
213            Ok(indices)
214        }
215    }
216}
217
218pub fn apply_filtering_function(
219    index: usize,
220    value: &DataValue,
221    expression: &Expression,
222) -> Option<usize> {
223    match &expression.left {
224        DataInput::Function(_key, Function::Len) => {
225            let right = FilterArgument::Value(expression.right.value());
226            match value {
227                DataValue::Vec(vec) => match_operator(
228                    index,
229                    &DataValue::from(vec.len() as u64),
230                    &right,
231                    expression.operator,
232                ),
233                DataValue::String(s) => match_operator(
234                    index,
235                    &DataValue::from(s.len() as u64),
236                    &right,
237                    expression.operator,
238                ),
239                _ => {
240                    // If the value is not a Vec or String, return a default value
241                    None
242                }
243            }
244        }
245        DataInput::Function(_key, Function::ToDateTimeUs) => {
246            let left = from_datavalue_to_timestamp_us(value).unwrap_or_default();
247            let Ok(right) = from_datavalue_to_timestamp_us(&expression.right.value()) else {
248                return None;
249            };
250
251            match_operator(
252                index,
253                &left,
254                &FilterArgument::Value(right),
255                expression.operator,
256            )
257        }
258        DataInput::Mod(_key, modulo) => {
259            let right_value = expression.right.value();
260            let mod_result = f64::extract(value) % f64::extract(modulo);
261            match_operator(
262                index,
263                &right_value,
264                &FilterArgument::Value(mod_result.into()),
265                expression.operator,
266            )
267        }
268        _ => {
269            let right = FilterArgument::Value(expression.right.value());
270            match_operator(index, value, &right, expression.operator)
271        }
272    }
273}