1use std::sync::LazyLock;
2
3use datafusion::{
4 functions::core::getfield::GetFieldFunc,
5 logical_expr::{BinaryExpr, Like, Operator, ScalarUDFImpl, expr::ScalarFunction},
6 prelude::Expr,
7 scalar::ScalarValue,
8};
9
10use crate::{LABELS_FIELD_REF, LINE_FIELD_REF, TIMESTAMP_FIELD_REF};
11
12static GET_FIELD_FUNC: LazyLock<GetFieldFunc> = LazyLock::new(GetFieldFunc::new);
13
14pub fn expr_to_label_filter(expr: &Expr) -> Option<String> {
15 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
16 let Expr::ScalarFunction(ScalarFunction { func, args }) = left.as_ref() else {
17 return None;
18 };
19 if func.name() != GET_FIELD_FUNC.name() {
20 return None;
21 }
22 if args.len() != 2 {
23 return None;
24 }
25 let label = match (&args[0], &args[1]) {
26 (Expr::Column(col), Expr::Literal(ScalarValue::Utf8(value), _))
27 if col.name() == LABELS_FIELD_REF.name() =>
28 {
29 value.as_ref()
30 }
31 _ => None,
32 }?;
33
34 let Expr::Literal(ScalarValue::Utf8(value), _) = right.as_ref() else {
35 return None;
36 };
37 let empty_string = String::new();
38 let value = value.as_ref().unwrap_or(&empty_string);
39
40 match op {
41 Operator::Eq => Some(format!("{label}=\"{value}\"")),
42 Operator::NotEq => Some(format!("{label}!=\"{value}\"")),
43 Operator::RegexMatch => Some(format!("{label}=~\"{value}\"")),
44 Operator::RegexNotMatch => Some(format!("{label}!~\"{value}\"")),
45 _ => None,
46 }
47 } else {
48 None
49 }
50}
51
52pub fn expr_to_line_filter(expr: &Expr) -> Option<String> {
53 let cols = expr.column_refs();
54 if cols.len() != 1 {
55 return None;
56 }
57 let col = cols.iter().next()?;
58 if col.name() != LINE_FIELD_REF.name() {
59 return None;
60 }
61
62 let empty_string = String::new();
63
64 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
65 match op {
66 Operator::RegexMatch => match (left.as_ref(), right.as_ref()) {
67 (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
68 Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
69 }
70 (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
71 Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
72 }
73 _ => None,
74 },
75 Operator::RegexNotMatch => match (left.as_ref(), right.as_ref()) {
76 (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
77 Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
78 }
79 (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
80 Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
81 }
82 _ => None,
83 },
84 _ => None,
85 }
86 } else if let Expr::Like(Like {
87 negated,
88 expr,
89 pattern,
90 case_insensitive,
91 ..
92 }) = expr
93 {
94 let Expr::Column(_) = expr.as_ref() else {
95 return None;
96 };
97 let Expr::Literal(ScalarValue::Utf8(value), _) = pattern.as_ref() else {
98 return None;
99 };
100 let value = value.as_ref().unwrap_or(&empty_string);
101 if value.starts_with("%") && value.ends_with("%") && !value.contains("_") {
102 let value = value.trim_start_matches('%').trim_end_matches('%');
103 match (negated, case_insensitive) {
104 (true, true) => Some(format!("!~ `(?i){}`", value)),
105 (true, false) => Some(format!("!= `{}`", value)),
106 (false, true) => Some(format!("|~ `(?i){}`", value)),
107 (false, false) => Some(format!("|= `{}`", value)),
108 }
109 } else {
110 None
111 }
112 } else {
113 None
114 }
115}
116
117pub enum TimestampBound {
118 Start(Option<i64>),
119 End(Option<i64>),
120}
121pub fn parse_timestamp_bound(expr: &Expr) -> Option<TimestampBound> {
122 let cols = expr.column_refs();
123 if cols.len() != 1 {
124 return None;
125 }
126 let col = cols.iter().next()?;
127 if col.name() != TIMESTAMP_FIELD_REF.name() {
128 return None;
129 }
130 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
131 match op {
132 Operator::Lt | Operator::LtEq => match (left.as_ref(), right.as_ref()) {
133 (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
134 Some(TimestampBound::End(*value))
135 }
136 (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
137 Some(TimestampBound::Start(*value))
138 }
139 _ => None,
140 },
141 Operator::Gt | Operator::GtEq => match (left.as_ref(), right.as_ref()) {
142 (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
143 Some(TimestampBound::Start(*value))
144 }
145 (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
146 Some(TimestampBound::End(*value))
147 }
148 _ => None,
149 },
150 _ => None,
151 }
152 } else {
153 None
154 }
155}