1use std::sync::LazyLock;
2
3use datafusion_common::ScalarValue;
4use datafusion_expr::{BinaryExpr, Expr, Like, Operator, ScalarUDFImpl, expr::ScalarFunction};
5use datafusion_functions::core::getfield::GetFieldFunc;
6
7use crate::{LABELS_FIELD_REF, LINE_FIELD_REF, TIMESTAMP_FIELD_REF};
8
9static GET_FIELD_FUNC: LazyLock<GetFieldFunc> = LazyLock::new(GetFieldFunc::new);
10
11pub fn expr_to_label_filter(expr: &Expr) -> Option<String> {
12 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
13 let Expr::ScalarFunction(ScalarFunction { func, args }) = left.as_ref() else {
14 return None;
15 };
16 if func.name() != GET_FIELD_FUNC.name() {
17 return None;
18 }
19 if args.len() != 2 {
20 return None;
21 }
22 let label = match (&args[0], &args[1]) {
23 (Expr::Column(col), Expr::Literal(ScalarValue::Utf8(value), _))
24 if col.name() == LABELS_FIELD_REF.name() =>
25 {
26 value.as_ref()
27 }
28 _ => None,
29 }?;
30
31 let Expr::Literal(ScalarValue::Utf8(value), _) = right.as_ref() else {
32 return None;
33 };
34 let empty_string = String::new();
35 let value = value.as_ref().unwrap_or(&empty_string);
36
37 match op {
38 Operator::Eq => Some(format!("{label}=\"{value}\"")),
39 Operator::NotEq => Some(format!("{label}!=\"{value}\"")),
40 Operator::RegexMatch => Some(format!("{label}=~\"{value}\"")),
41 Operator::RegexNotMatch => Some(format!("{label}!~\"{value}\"")),
42 _ => None,
43 }
44 } else {
45 None
46 }
47}
48
49pub fn expr_to_line_filter(expr: &Expr) -> Option<String> {
50 let cols = expr.column_refs();
51 if cols.len() != 1 {
52 return None;
53 }
54 let col = cols.iter().next()?;
55 if col.name() != LINE_FIELD_REF.name() {
56 return None;
57 }
58
59 let empty_string = String::new();
60
61 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
62 match op {
63 Operator::RegexMatch => match (left.as_ref(), right.as_ref()) {
64 (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
65 Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
66 }
67 (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
68 Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
69 }
70 _ => None,
71 },
72 Operator::RegexNotMatch => match (left.as_ref(), right.as_ref()) {
73 (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
74 Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
75 }
76 (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
77 Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
78 }
79 _ => None,
80 },
81 _ => None,
82 }
83 } else if let Expr::Like(Like {
84 negated,
85 expr,
86 pattern,
87 case_insensitive,
88 ..
89 }) = expr
90 {
91 let Expr::Column(_) = expr.as_ref() else {
92 return None;
93 };
94 let Expr::Literal(ScalarValue::Utf8(value), _) = pattern.as_ref() else {
95 return None;
96 };
97 let value = value.as_ref().unwrap_or(&empty_string);
98 if value.starts_with("%") && value.ends_with("%") && !value.contains("_") {
99 let value = value.trim_start_matches('%').trim_end_matches('%');
100 match (negated, case_insensitive) {
101 (true, true) => Some(format!("!~ `(?i){}`", value)),
102 (true, false) => Some(format!("!= `{}`", value)),
103 (false, true) => Some(format!("|~ `(?i){}`", value)),
104 (false, false) => Some(format!("|= `{}`", value)),
105 }
106 } else {
107 None
108 }
109 } else {
110 None
111 }
112}
113
114pub enum TimestampBound {
115 Start(Option<i64>),
116 End(Option<i64>),
117}
118pub fn parse_timestamp_bound(expr: &Expr) -> Option<TimestampBound> {
119 let cols = expr.column_refs();
120 if cols.len() != 1 {
121 return None;
122 }
123 let col = cols.iter().next()?;
124 if col.name() != TIMESTAMP_FIELD_REF.name() {
125 return None;
126 }
127 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
128 match op {
129 Operator::Lt | Operator::LtEq => match (left.as_ref(), right.as_ref()) {
130 (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
131 Some(TimestampBound::End(*value))
132 }
133 (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
134 Some(TimestampBound::Start(*value))
135 }
136 _ => None,
137 },
138 Operator::Gt | Operator::GtEq => match (left.as_ref(), right.as_ref()) {
139 (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
140 Some(TimestampBound::Start(*value))
141 }
142 (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
143 Some(TimestampBound::End(*value))
144 }
145 _ => None,
146 },
147 _ => None,
148 }
149 } else {
150 None
151 }
152}