1use std::sync::LazyLock;
2
3use datafusion::{
4 logical_expr::{BinaryExpr, Like, Operator, ScalarUDFImpl, expr::ScalarFunction},
5 prelude::Expr,
6 scalar::ScalarValue,
7};
8
9use crate::{LABELS_FIELD_REF, LINE_FIELD_REF, MapGet, TIMESTAMP_FIELD_REF};
10
11static MAP_GET_FUNC: LazyLock<MapGet> = LazyLock::new(MapGet::new);
12
13pub fn expr_to_label_filter(expr: &Expr) -> Option<String> {
14 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
15 let Expr::ScalarFunction(ScalarFunction { func, args }) = left.as_ref() else {
16 return None;
17 };
18 if func.name() != MAP_GET_FUNC.name() {
19 return None;
20 }
21 if args.len() != 2 {
22 return None;
23 }
24 let label = match (&args[0], &args[1]) {
25 (Expr::Column(col), Expr::Literal(ScalarValue::Utf8(value), _))
26 if col.name() == LABELS_FIELD_REF.name() =>
27 {
28 value.as_ref()
29 }
30 _ => None,
31 }?;
32
33 let Expr::Literal(ScalarValue::Utf8(value), _) = right.as_ref() else {
34 return None;
35 };
36 let empty_string = String::new();
37 let value = value.as_ref().unwrap_or(&empty_string);
38
39 match op {
40 Operator::Eq => Some(format!("{label}=\"{value}\"")),
41 Operator::NotEq => Some(format!("{label}!=\"{value}\"")),
42 Operator::RegexMatch => Some(format!("{label}=~\"{value}\"")),
43 Operator::RegexNotMatch => Some(format!("{label}!~\"{value}\"")),
44 _ => None,
45 }
46 } else {
47 None
48 }
49}
50
51pub fn expr_to_line_filter(expr: &Expr) -> Option<String> {
52 let cols = expr.column_refs();
53 if cols.len() != 1 {
54 return None;
55 }
56 let col = cols.iter().next()?;
57 if col.name() != LINE_FIELD_REF.name() {
58 return None;
59 }
60
61 let empty_string = String::new();
62
63 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
64 match op {
65 Operator::RegexMatch => match (left.as_ref(), right.as_ref()) {
66 (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
67 Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
68 }
69 (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
70 Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
71 }
72 _ => None,
73 },
74 Operator::RegexNotMatch => match (left.as_ref(), right.as_ref()) {
75 (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
76 Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
77 }
78 (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
79 Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
80 }
81 _ => None,
82 },
83 _ => None,
84 }
85 } else if let Expr::Like(Like {
86 negated,
87 expr,
88 pattern,
89 case_insensitive,
90 ..
91 }) = expr
92 {
93 let Expr::Column(_) = expr.as_ref() else {
94 return None;
95 };
96 let Expr::Literal(ScalarValue::Utf8(value), _) = pattern.as_ref() else {
97 return None;
98 };
99 let value = value.as_ref().unwrap_or(&empty_string);
100 if value.starts_with("%") && value.ends_with("%") && !value.contains("_") {
101 let value = value.trim_start_matches('%').trim_end_matches('%');
102 match (negated, case_insensitive) {
103 (true, true) => Some(format!("!~ `(?i){}`", value)),
104 (true, false) => Some(format!("!= `{}`", value)),
105 (false, true) => Some(format!("|~ `(?i){}`", value)),
106 (false, false) => Some(format!("|= `{}`", value)),
107 }
108 } else {
109 None
110 }
111 } else {
112 None
113 }
114}
115
116pub enum TimestampBound {
117 Start(Option<i64>),
118 End(Option<i64>),
119}
120pub fn parse_timestamp_bound(expr: &Expr) -> Option<TimestampBound> {
121 let cols = expr.column_refs();
122 if cols.len() != 1 {
123 return None;
124 }
125 let col = cols.iter().next()?;
126 if col.name() != TIMESTAMP_FIELD_REF.name() {
127 return None;
128 }
129 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
130 match op {
131 Operator::Lt | Operator::LtEq => match (left.as_ref(), right.as_ref()) {
132 (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
133 Some(TimestampBound::End(*value))
134 }
135 (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
136 Some(TimestampBound::Start(*value))
137 }
138 _ => None,
139 },
140 Operator::Gt | Operator::GtEq => match (left.as_ref(), right.as_ref()) {
141 (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
142 Some(TimestampBound::Start(*value))
143 }
144 (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
145 Some(TimestampBound::End(*value))
146 }
147 _ => None,
148 },
149 _ => None,
150 }
151 } else {
152 None
153 }
154}