datafusion_loki/
expr.rs

1use std::sync::LazyLock;
2
3use datafusion_common::ScalarValue;
4use datafusion_expr::{BinaryExpr, Expr, Like, Operator, ScalarUDFImpl, expr::ScalarFunction};
5use datafusion_functions::core::getfield::GetFieldFunc;
6
7use crate::{LABELS_FIELD_REF, LINE_FIELD_REF, TIMESTAMP_FIELD_REF};
8
9static GET_FIELD_FUNC: LazyLock<GetFieldFunc> = LazyLock::new(GetFieldFunc::new);
10
11pub fn expr_to_label_filter(expr: &Expr) -> Option<String> {
12    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
13        let Expr::ScalarFunction(ScalarFunction { func, args }) = left.as_ref() else {
14            return None;
15        };
16        if func.name() != GET_FIELD_FUNC.name() {
17            return None;
18        }
19        if args.len() != 2 {
20            return None;
21        }
22        let label = match (&args[0], &args[1]) {
23            (Expr::Column(col), Expr::Literal(ScalarValue::Utf8(value), _))
24                if col.name() == LABELS_FIELD_REF.name() =>
25            {
26                value.as_ref()
27            }
28            _ => None,
29        }?;
30
31        let Expr::Literal(ScalarValue::Utf8(value), _) = right.as_ref() else {
32            return None;
33        };
34        let empty_string = String::new();
35        let value = value.as_ref().unwrap_or(&empty_string);
36
37        match op {
38            Operator::Eq => Some(format!("{label}=\"{value}\"")),
39            Operator::NotEq => Some(format!("{label}!=\"{value}\"")),
40            Operator::RegexMatch => Some(format!("{label}=~\"{value}\"")),
41            Operator::RegexNotMatch => Some(format!("{label}!~\"{value}\"")),
42            _ => None,
43        }
44    } else {
45        None
46    }
47}
48
49pub fn expr_to_line_filter(expr: &Expr) -> Option<String> {
50    let cols = expr.column_refs();
51    if cols.len() != 1 {
52        return None;
53    }
54    let col = cols.iter().next()?;
55    if col.name() != LINE_FIELD_REF.name() {
56        return None;
57    }
58
59    let empty_string = String::new();
60
61    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
62        match op {
63            Operator::RegexMatch => match (left.as_ref(), right.as_ref()) {
64                (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
65                    Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
66                }
67                (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
68                    Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
69                }
70                _ => None,
71            },
72            Operator::RegexNotMatch => match (left.as_ref(), right.as_ref()) {
73                (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
74                    Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
75                }
76                (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
77                    Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
78                }
79                _ => None,
80            },
81            _ => None,
82        }
83    } else if let Expr::Like(Like {
84        negated,
85        expr,
86        pattern,
87        case_insensitive,
88        ..
89    }) = expr
90    {
91        let Expr::Column(_) = expr.as_ref() else {
92            return None;
93        };
94        let Expr::Literal(ScalarValue::Utf8(value), _) = pattern.as_ref() else {
95            return None;
96        };
97        let value = value.as_ref().unwrap_or(&empty_string);
98        if value.starts_with("%") && value.ends_with("%") && !value.contains("_") {
99            let value = value.trim_start_matches('%').trim_end_matches('%');
100            match (negated, case_insensitive) {
101                (true, true) => Some(format!("!~ `(?i){}`", value)),
102                (true, false) => Some(format!("!= `{}`", value)),
103                (false, true) => Some(format!("|~ `(?i){}`", value)),
104                (false, false) => Some(format!("|= `{}`", value)),
105            }
106        } else {
107            None
108        }
109    } else {
110        None
111    }
112}
113
114pub enum TimestampBound {
115    Start(Option<i64>),
116    End(Option<i64>),
117}
118pub fn parse_timestamp_bound(expr: &Expr) -> Option<TimestampBound> {
119    let cols = expr.column_refs();
120    if cols.len() != 1 {
121        return None;
122    }
123    let col = cols.iter().next()?;
124    if col.name() != TIMESTAMP_FIELD_REF.name() {
125        return None;
126    }
127    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
128        match op {
129            Operator::Lt | Operator::LtEq => match (left.as_ref(), right.as_ref()) {
130                (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
131                    Some(TimestampBound::End(*value))
132                }
133                (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
134                    Some(TimestampBound::Start(*value))
135                }
136                _ => None,
137            },
138            Operator::Gt | Operator::GtEq => match (left.as_ref(), right.as_ref()) {
139                (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
140                    Some(TimestampBound::Start(*value))
141                }
142                (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
143                    Some(TimestampBound::End(*value))
144                }
145                _ => None,
146            },
147            _ => None,
148        }
149    } else {
150        None
151    }
152}