trs_dataframe/filter/
filtering.rs1use data_value::{DataValue, Extract};
2
3use crate::filter::{
4 error::Error, DataInput, Expression, FilterArgument, FilterCombinantion, FilterOperator,
5 Filtering, Function,
6};
7
8pub fn match_operator(
9 i: usize,
10 value: &DataValue,
11 right: &FilterArgument,
12 operator: FilterOperator,
13) -> Option<usize> {
14 tracing::trace!(
15 "Matching operator: {:?} for value: {:?} and right: {:?}",
16 operator,
17 value,
18 right
19 );
20 match operator {
21 FilterOperator::Equal => {
22 if value.eq(right.value()) {
23 Some(i)
24 } else {
25 None
26 }
27 }
28 FilterOperator::NotEqual => {
29 if !value.eq(right.value()) {
30 Some(i)
31 } else {
32 None
33 }
34 }
35 FilterOperator::Greater => {
36 if value.gt(right.value()) {
37 Some(i)
38 } else {
39 None
40 }
41 }
42 FilterOperator::Less => {
43 if value.lt(right.value()) {
44 Some(i)
45 } else {
46 None
47 }
48 }
49 FilterOperator::GrOrEq => {
50 if value.ge(right.value()) {
51 Some(i)
52 } else {
53 None
54 }
55 }
56 FilterOperator::LeOrEq => {
57 if value.le(right.value()) {
58 Some(i)
59 } else {
60 None
61 }
62 }
63 FilterOperator::In => {
64 if let Some(vec) = right.vec() {
65 if vec.contains(value) {
66 Some(i)
67 } else {
68 None
69 }
70 } else {
71 None
72 }
73 }
74 FilterOperator::NotIn => {
75 if let Some(vec) = right.vec() {
76 if !vec.contains(value) {
77 Some(i)
78 } else {
79 None
80 }
81 } else {
82 None
83 }
84 }
85 FilterOperator::Regex => {
86 if let Some(pattern) = right.regex() {
87 if let DataValue::String(value_str) = value {
88 if pattern.is_match(value_str) {
89 Some(i)
90 } else {
91 None
92 }
93 } else {
94 None
95 }
96 } else {
97 None
98 }
99 }
100 }
101}
102
103pub fn from_datavalue_to_timestamp_us(data_value: &DataValue) -> Result<DataValue, Error> {
104 match data_value {
105 DataValue::String(d) => {
106 match chrono::NaiveDateTime::parse_from_str(d.as_str(), "%Y-%m-%d %H:%M:%S") {
108 Ok(dt) => {
109 Ok(DataValue::from(dt.and_utc().timestamp_micros() as u64))
111 }
112 Err(_) => {
113 Ok(data_value.clone())
115 }
116 }
117 }
118 DataValue::I64(ts) => {
119 let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts);
121 if let Some(ts) = dt {
122 Ok(DataValue::from(ts.timestamp_micros() as u64))
123 } else {
124 Ok(DataValue::U64(*ts as u64))
125 }
126 }
127 DataValue::U64(ts) => {
128 let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts as i64);
130 if let Some(ts) = dt {
131 Ok(DataValue::from(ts.timestamp_micros() as u64))
132 } else {
133 Ok(DataValue::U64(*ts))
134 }
135 }
136 DataValue::F64(ts) => {
137 let dt = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(*ts as i64);
139 if let Some(ts) = dt {
140 Ok(DataValue::from(ts.timestamp_micros() as u64))
141 } else {
142 Ok(DataValue::U64(*ts as u64))
143 }
144 }
145 _ => Ok(data_value.clone()),
146 }
147}
148
149pub fn apply_function(df: &impl Filtering, expression: &Expression) -> Result<Vec<usize>, Error> {
150 df.apply_function(expression)
151}
152
153pub fn create_indices_from_expression(
154 filtered_df: &impl Filtering,
155 expression: &Expression,
156) -> Result<Vec<usize>, Error> {
157 filtered_df.prepare_indicies(expression)
158}
159
160pub fn filter_combination(
161 df: &impl Filtering,
162 expression: &FilterCombinantion,
163) -> Result<Vec<usize>, Error> {
164 match expression {
170 FilterCombinantion::Simple(expr) => create_indices_from_expression(df, expr),
171 FilterCombinantion::And(left, right) => {
172 let left_indices = create_indices_from_expression(df, left)?;
173 tracing::trace!(
174 "AND Left indices: {:?} for expression: {:?}",
175 left_indices,
176 left
177 );
178 let right_indices = filter_combination(df, right.as_ref())?;
179 tracing::trace!(
180 "AND Right indices: {:?} for expression: {:?}",
181 right_indices,
182 right
183 );
184
185 Ok(left_indices
186 .into_iter()
187 .filter(|i| right_indices.contains(i))
188 .collect())
189 }
190 FilterCombinantion::Or(left, right) => {
191 let left_indices = create_indices_from_expression(df, left)?;
192 tracing::trace!(
193 "OR Left indices: {:?} for expression: {:?}",
194 left_indices,
195 left
196 );
197 let right_indices = filter_combination(df, right.as_ref())?;
198 tracing::trace!(
199 "OR Right indices: {:?} for expression: {:?}",
200 right_indices,
201 right
202 );
203 Ok(left_indices.into_iter().chain(right_indices).collect())
204 }
205 FilterCombinantion::Grouped(expressions) => {
206 let mut indices = Vec::new();
207 for expr in expressions {
208 let expr_indices = filter_combination(df, expr)?;
209 indices.extend(expr_indices);
210 }
211 indices.sort_unstable();
212 indices.dedup();
213 Ok(indices)
214 }
215 }
216}
217
218pub fn apply_filtering_function(
219 index: usize,
220 value: &DataValue,
221 expression: &Expression,
222) -> Option<usize> {
223 match &expression.left {
224 DataInput::Function(_key, Function::Len) => {
225 let right = FilterArgument::Value(expression.right.value());
226 match value {
227 DataValue::Vec(vec) => match_operator(
228 index,
229 &DataValue::from(vec.len() as u64),
230 &right,
231 expression.operator,
232 ),
233 DataValue::String(s) => match_operator(
234 index,
235 &DataValue::from(s.len() as u64),
236 &right,
237 expression.operator,
238 ),
239 _ => {
240 None
242 }
243 }
244 }
245 DataInput::Function(_key, Function::ToDateTimeUs) => {
246 let left = from_datavalue_to_timestamp_us(value).unwrap_or_default();
247 let Ok(right) = from_datavalue_to_timestamp_us(&expression.right.value()) else {
248 return None;
249 };
250
251 match_operator(
252 index,
253 &left,
254 &FilterArgument::Value(right),
255 expression.operator,
256 )
257 }
258 DataInput::Mod(_key, modulo) => {
259 let right_value = expression.right.value();
260 let mod_result = f64::extract(value) % f64::extract(modulo);
261 match_operator(
262 index,
263 &right_value,
264 &FilterArgument::Value(mod_result.into()),
265 expression.operator,
266 )
267 }
268 _ => {
269 let right = FilterArgument::Value(expression.right.value());
270 match_operator(index, value, &right, expression.operator)
271 }
272 }
273}