Skip to main content

trs_dataframe/filter/
filtering.rs

1use data_value::{DataValue, Extract};
2
3use crate::filter::{
4    error::Error, DataInput, Expression, FilterArgument, FilterCombinantion, FilterOperator,
5    Filtering, Function,
6};
7
8pub fn match_operator(
9    i: usize,
10    value: &DataValue,
11    right: &FilterArgument,
12    operator: FilterOperator,
13) -> Option<usize> {
14    tracing::trace!(
15        "Matching operator: {:?} for value: {:?} and right: {:?}",
16        operator,
17        value,
18        right
19    );
20    match operator {
21        FilterOperator::Equal => {
22            if value.eq(right.value()) {
23                Some(i)
24            } else {
25                None
26            }
27        }
28        FilterOperator::NotEqual => {
29            if !value.eq(right.value()) {
30                Some(i)
31            } else {
32                None
33            }
34        }
35        FilterOperator::Greater => {
36            if value.gt(right.value()) {
37                Some(i)
38            } else {
39                None
40            }
41        }
42        FilterOperator::Less => {
43            if value.lt(right.value()) {
44                Some(i)
45            } else {
46                None
47            }
48        }
49        FilterOperator::GrOrEq => {
50            if value.ge(right.value()) {
51                Some(i)
52            } else {
53                None
54            }
55        }
56        FilterOperator::LeOrEq => {
57            if value.le(right.value()) {
58                Some(i)
59            } else {
60                None
61            }
62        }
63        FilterOperator::In => {
64            if let Some(vec) = right.vec() {
65                if vec.contains(value) {
66                    Some(i)
67                } else {
68                    None
69                }
70            } else {
71                None
72            }
73        }
74        FilterOperator::NotIn => {
75            if let Some(vec) = right.vec() {
76                if !vec.contains(value) {
77                    Some(i)
78                } else {
79                    None
80                }
81            } else {
82                None
83            }
84        }
85        FilterOperator::Regex => {
86            if let Some(pattern) = right.regex() {
87                if let DataValue::String(value_str) = value {
88                    if pattern.is_match(value_str) {
89                        Some(i)
90                    } else {
91                        None
92                    }
93                } else {
94                    None
95                }
96            } else {
97                None
98            }
99        }
100    }
101}
102
103#[cfg(feature = "timestamp-filter")]
104pub fn from_datavalue_to_timestamp_us(data_value: &DataValue) -> Result<DataValue, Error> {
105    match data_value {
106        DataValue::String(d) => {
107            // Attempt to parse the string as a datetime
108            match chrono::NaiveDateTime::parse_from_str(d.as_str(), "%Y-%m-%d %H:%M:%S") {
109                Ok(dt) => {
110                    // Convert NaiveDateTime to DataValue::String - only check
111                    Ok(DataValue::from(dt.and_utc().timestamp_micros() as u64))
112                }
113                Err(_) => {
114                    // If parsing fails, return an error or a default value
115                    Ok(data_value.clone())
116                }
117            }
118        }
119        DataValue::I64(ts) => {
120            // Convert timestamp to DateTime
121            let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts);
122            if let Some(ts) = dt {
123                Ok(DataValue::from(ts.timestamp_micros() as u64))
124            } else {
125                Ok(DataValue::U64(*ts as u64))
126            }
127        }
128        DataValue::U64(ts) => {
129            // Convert timestamp to DateTime
130            let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts as i64);
131            if let Some(ts) = dt {
132                Ok(DataValue::from(ts.timestamp_micros() as u64))
133            } else {
134                Ok(DataValue::U64(*ts))
135            }
136        }
137        DataValue::F64(ts) => {
138            // Convert timestamp to DateTime
139            let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts as i64);
140            if let Some(ts) = dt {
141                Ok(DataValue::from(ts.timestamp_micros() as u64))
142            } else {
143                Ok(DataValue::U64(*ts as u64))
144            }
145        }
146        _ => Ok(data_value.clone()),
147    }
148}
149
150pub fn apply_function(df: &impl Filtering, expression: &Expression) -> Result<Vec<usize>, Error> {
151    df.apply_function(expression)
152}
153
154pub fn create_indices_from_expression(
155    filtered_df: &impl Filtering,
156    expression: &Expression,
157) -> Result<Vec<usize>, Error> {
158    filtered_df.prepare_indicies(expression)
159}
160
161pub fn filter_combination(
162    df: &impl Filtering,
163    expression: &FilterCombinantion,
164) -> Result<Vec<usize>, Error> {
165    // this is very naive implementation for now
166    // This can be done with more complex logic by cutting indexes
167    // based on the previous results, but for now we will just
168    // create indices for each expression and then combine them
169    // based on the operator
170    match expression {
171        FilterCombinantion::Simple(expr) => create_indices_from_expression(df, expr),
172        FilterCombinantion::And(left, right) => {
173            let left_indices = create_indices_from_expression(df, left)?;
174            tracing::trace!(
175                "AND Left indices: {:?} for expression: {:?}",
176                left_indices,
177                left
178            );
179            let right_indices = filter_combination(df, right.as_ref())?;
180            tracing::trace!(
181                "AND Right indices: {:?} for expression: {:?}",
182                right_indices,
183                right
184            );
185
186            Ok(left_indices
187                .into_iter()
188                .filter(|i| right_indices.contains(i))
189                .collect())
190        }
191        FilterCombinantion::Or(left, right) => {
192            let left_indices = create_indices_from_expression(df, left)?;
193            tracing::trace!(
194                "OR Left indices: {:?} for expression: {:?}",
195                left_indices,
196                left
197            );
198            let right_indices = filter_combination(df, right.as_ref())?;
199            tracing::trace!(
200                "OR Right indices: {:?} for expression: {:?}",
201                right_indices,
202                right
203            );
204            Ok(left_indices.into_iter().chain(right_indices).collect())
205        }
206        FilterCombinantion::Grouped(expressions) => {
207            let mut indices = Vec::new();
208            for expr in expressions {
209                let expr_indices = filter_combination(df, expr)?;
210                indices.extend(expr_indices);
211            }
212            indices.sort_unstable();
213            indices.dedup();
214            Ok(indices)
215        }
216    }
217}
218
219pub fn apply_filtering_function(
220    index: usize,
221    value: &DataValue,
222    expression: &Expression,
223) -> Option<usize> {
224    match &expression.left {
225        DataInput::Function(_key, Function::Len) => {
226            let right = FilterArgument::Value(expression.right.value());
227            match value {
228                DataValue::Vec(vec) => match_operator(
229                    index,
230                    &DataValue::from(vec.len() as u64),
231                    &right,
232                    expression.operator,
233                ),
234                DataValue::String(s) => match_operator(
235                    index,
236                    &DataValue::from(s.len() as u64),
237                    &right,
238                    expression.operator,
239                ),
240                _ => {
241                    // If the value is not a Vec or String, return a default value
242                    None
243                }
244            }
245        }
246        #[cfg(feature = "timestamp-filter")]
247        DataInput::Function(_key, Function::ToDateTimeUs) => {
248            let left = from_datavalue_to_timestamp_us(value).unwrap_or_default();
249            let Ok(right) = from_datavalue_to_timestamp_us(&expression.right.value()) else {
250                return None;
251            };
252
253            match_operator(
254                index,
255                &left,
256                &FilterArgument::Value(right),
257                expression.operator,
258            )
259        }
260        DataInput::Mod(_key, modulo) => {
261            let right_value = expression.right.value();
262            let mod_result = f64::extract(value) % f64::extract(modulo);
263            match_operator(
264                index,
265                &right_value,
266                &FilterArgument::Value(mod_result.into()),
267                expression.operator,
268            )
269        }
270        _ => {
271            let right = FilterArgument::Value(expression.right.value());
272            match_operator(index, value, &right, expression.operator)
273        }
274    }
275}
276
277#[cfg(test)]
278mod test {
279    use crate::filter::FilterRules;
280
281    use super::*;
282    use crate::{column_frame, ColumnFrame};
283    use rstest::*;
284    use tracing_test::traced_test;
285    #[cfg(feature = "timestamp-filter")]
286    #[rstest]
287    #[case(
288        column_frame! {
289            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
290            "b" => [4, 5, 6],
291            "c" => [7, 8, 9]
292        },
293        column_frame! {
294            "t" => [1752001987000000u64],
295            "b" => [5],
296            "c" => [8]
297        },
298        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
299    )]
300    #[case(
301        column_frame! {
302            "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
303            "b" => [4, 5, 6],
304            "c" => [7, 8, 9]
305        },
306        column_frame! {
307            "t" => [1752001987000000f64],
308            "b" => [5],
309            "c" => [8]
310        },
311        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
312    )]
313    #[case(
314        column_frame! {
315            "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
316            "b" => [4, 5, 6],
317            "c" => [7, 8, 9]
318        },
319        column_frame! {
320            "t" => [1752001987000000i64],
321            "b" => [5],
322            "c" => [8]
323        },
324        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
325    )]
326    #[case(
327        column_frame! {
328            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
329            "b" => [4, 5, 6],
330            "c" => [7, 8, 9]
331        },
332        column_frame! {
333            "t" => [1751001987000000u64],
334            "b" => [4],
335            "c" => [7]
336        },
337        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
338    )]
339    #[case(
340        column_frame! {
341            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
342            "b" => [4, 5, 6],
343            "c" => [7, 8, 9]
344        },
345        column_frame! {
346            "t" => ["2025-07-08 18:13:07"],
347            "b" => [4],
348            "c" => [7]
349        },
350        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
351    )]
352    #[case(
353        column_frame! {
354            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
355            "b" => [4, 5, 6],
356            "c" => [7, 8, 9]
357        },
358        column_frame! {
359            "t" => [],
360            "b" => [],
361            "c" => []
362        },
363        FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
364    )]
365    #[case(
366        column_frame! {
367            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
368            "b" => [4, 5, 6],
369            "c" => [7, 8, 9]
370        },
371        column_frame! {
372            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
373            "b" => [4, 5, 6],
374            "c" => [7, 8, 9]
375        },
376        FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
377    )]
378    #[traced_test]
379    fn filter_test_timestamp(
380        #[case] df: ColumnFrame,
381        #[case] expected: ColumnFrame,
382        #[case] filter: FilterRules,
383    ) {
384        let filtered = df.filter(&filter).expect("BUG: cannot filter");
385        assert_eq!(filtered, expected);
386    }
387
388    #[rstest]
389    #[case(
390        column_frame! {
391            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
392            "b" => [4, 5, 6],
393            "c" => [7, 8, 9]
394        },
395        column_frame! {
396            "t" => [DataValue::Vec(vec![])],
397            "b" => [5],
398            "c" => [ 8]
399        },
400        FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
401    )]
402    #[case(
403        column_frame! {
404            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
405            "b" => [4, 5, 6],
406            "c" => [7, 8, 9]
407        },
408        column_frame! {
409            "t" => [DataValue::Vec(vec![1.into()])],
410            "b" => [6],
411            "c" => [9]
412        },
413        FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
414    )]
415    #[case(
416        column_frame! {
417            "a" => [1, 2, 3],
418            "b" => [4, 5, 6],
419            "c" => [7, 8, 9]
420        },
421        column_frame! {
422            "a" => [1, 2],
423            "b" => [4, 5],
424            "c" => [7, 8]
425        },
426        FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
427    )]
428    #[case(
429        column_frame! {
430            "a" => [1, 2, 3],
431            "b" => [4, 5, 6],
432            "c" => [7, 8, 9]
433        },
434        column_frame! {
435            "a" => [2],
436            "b" => [5],
437            "c" => [8]
438        },
439        FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
440    )]
441    #[case(
442        column_frame! {
443            "a" => [1, 2, 3],
444            "b" => [4, 5, 6],
445            "c" => [7, 8, 9]
446        },
447        column_frame! {
448            "a" => [],
449            "b" => [],
450            "c" => []
451        },
452        FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
453    )]
454    #[case(
455        column_frame! {
456            "a" => [1, 2, 3],
457            "b" => [4, 5, 6],
458            "c" => [7, 8, 9]
459        },
460        column_frame! {
461            "a" => [1, 2],
462            "b" => [4, 5],
463            "c" => [7, 8]
464        },
465        FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
466    )]
467    #[case(
468        column_frame! {
469            "a" => [1, 2, 3],
470            "b" => [4, 5, 6],
471            "c" => [7, 8, 9]
472        },
473        column_frame! {
474            "a" => [2],
475            "b" => [5],
476            "c" => [8]
477        },
478        FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
479    )]
480    #[case(
481        column_frame! {
482            "a" => ["abcd", "ab", "abcdefg"],
483            "b" => [4, 5, 6],
484            "c" => [7, 8, 9]
485        },
486        column_frame! {
487            "a" => ["abcd","abcdefg"],
488            "b" => [4, 6],
489            "c" => [7, 9]
490        },
491        FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
492    )]
493    #[case(
494        column_frame! {
495            "a" => [1, 2, 3],
496            "b" => [4, 5, 6],
497            "c" => [7, 8, 9]
498        },
499        column_frame! {
500            "a" => [1],
501            "b" => [4],
502            "c" => [7]
503        },
504        FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
505    )]
506    #[case(
507        column_frame! {
508            "a" => [1, 2, 3],
509            "b" => [4, 5, 6],
510            "c" => [7, 8, 9]
511        },
512        column_frame! {
513            "a" => [2, 3],
514            "b" => [5, 6],
515            "c" => [8, 9]
516        },
517        FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
518    )]
519    #[case(
520        column_frame! {
521            "a" => [1f64, 2f64, 3f64],
522            "b" => [4, 5, 6],
523            "c" => [7, 8, 9]
524        },
525        column_frame! {
526            "a" => [1f64, 2f64],
527            "b" => [4, 5],
528            "c" => [7, 8]
529        },
530        FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
531    )]
532    #[case(
533        column_frame! {
534            "a" => [1f64, 2f64, 3f64],
535            "b" => [4i64, 5i64, 6i64],
536            "c" => [7i64, 8i64, 9i64]
537        },
538        column_frame! {
539            "a" => [1f64, 2f64],
540            "b" => [4i64, 5i64],
541            "c" => [7i64, 8i64]
542        },
543        FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
544    )]
545    #[traced_test]
546    fn filter_test(
547        #[case] df: ColumnFrame,
548        #[case] expected: ColumnFrame,
549        #[case] filter: FilterRules,
550    ) {
551        let filtered = df.filter(&filter).expect("BUG: cannot filter");
552        assert_eq!(filtered, expected);
553    }
554}