datafusion_loki/
expr.rs

1use std::sync::LazyLock;
2
3use datafusion::{
4    functions::core::getfield::GetFieldFunc,
5    logical_expr::{BinaryExpr, Like, Operator, ScalarUDFImpl, expr::ScalarFunction},
6    prelude::Expr,
7    scalar::ScalarValue,
8};
9
10use crate::{LABELS_FIELD_REF, LINE_FIELD_REF, TIMESTAMP_FIELD_REF};
11
12static GET_FIELD_FUNC: LazyLock<GetFieldFunc> = LazyLock::new(GetFieldFunc::new);
13
14pub fn expr_to_label_filter(expr: &Expr) -> Option<String> {
15    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
16        let Expr::ScalarFunction(ScalarFunction { func, args }) = left.as_ref() else {
17            return None;
18        };
19        if func.name() != GET_FIELD_FUNC.name() {
20            return None;
21        }
22        if args.len() != 2 {
23            return None;
24        }
25        let label = match (&args[0], &args[1]) {
26            (Expr::Column(col), Expr::Literal(ScalarValue::Utf8(value), _))
27                if col.name() == LABELS_FIELD_REF.name() =>
28            {
29                value.as_ref()
30            }
31            _ => None,
32        }?;
33
34        let Expr::Literal(ScalarValue::Utf8(value), _) = right.as_ref() else {
35            return None;
36        };
37        let empty_string = String::new();
38        let value = value.as_ref().unwrap_or(&empty_string);
39
40        match op {
41            Operator::Eq => Some(format!("{label}=\"{value}\"")),
42            Operator::NotEq => Some(format!("{label}!=\"{value}\"")),
43            Operator::RegexMatch => Some(format!("{label}=~\"{value}\"")),
44            Operator::RegexNotMatch => Some(format!("{label}!~\"{value}\"")),
45            _ => None,
46        }
47    } else {
48        None
49    }
50}
51
52pub fn expr_to_line_filter(expr: &Expr) -> Option<String> {
53    let cols = expr.column_refs();
54    if cols.len() != 1 {
55        return None;
56    }
57    let col = cols.iter().next()?;
58    if col.name() != LINE_FIELD_REF.name() {
59        return None;
60    }
61
62    let empty_string = String::new();
63
64    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
65        match op {
66            Operator::RegexMatch => match (left.as_ref(), right.as_ref()) {
67                (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
68                    Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
69                }
70                (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
71                    Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
72                }
73                _ => None,
74            },
75            Operator::RegexNotMatch => match (left.as_ref(), right.as_ref()) {
76                (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
77                    Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
78                }
79                (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
80                    Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
81                }
82                _ => None,
83            },
84            _ => None,
85        }
86    } else if let Expr::Like(Like {
87        negated,
88        expr,
89        pattern,
90        case_insensitive,
91        ..
92    }) = expr
93    {
94        let Expr::Column(_) = expr.as_ref() else {
95            return None;
96        };
97        let Expr::Literal(ScalarValue::Utf8(value), _) = pattern.as_ref() else {
98            return None;
99        };
100        let value = value.as_ref().unwrap_or(&empty_string);
101        if value.starts_with("%") && value.ends_with("%") && !value.contains("_") {
102            let value = value.trim_start_matches('%').trim_end_matches('%');
103            match (negated, case_insensitive) {
104                (true, true) => Some(format!("!~ `(?i){}`", value)),
105                (true, false) => Some(format!("!= `{}`", value)),
106                (false, true) => Some(format!("|~ `(?i){}`", value)),
107                (false, false) => Some(format!("|= `{}`", value)),
108            }
109        } else {
110            None
111        }
112    } else {
113        None
114    }
115}
116
117pub enum TimestampBound {
118    Start(Option<i64>),
119    End(Option<i64>),
120}
121pub fn parse_timestamp_bound(expr: &Expr) -> Option<TimestampBound> {
122    let cols = expr.column_refs();
123    if cols.len() != 1 {
124        return None;
125    }
126    let col = cols.iter().next()?;
127    if col.name() != TIMESTAMP_FIELD_REF.name() {
128        return None;
129    }
130    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
131        match op {
132            Operator::Lt | Operator::LtEq => match (left.as_ref(), right.as_ref()) {
133                (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
134                    Some(TimestampBound::End(*value))
135                }
136                (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
137                    Some(TimestampBound::Start(*value))
138                }
139                _ => None,
140            },
141            Operator::Gt | Operator::GtEq => match (left.as_ref(), right.as_ref()) {
142                (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
143                    Some(TimestampBound::Start(*value))
144                }
145                (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
146                    Some(TimestampBound::End(*value))
147                }
148                _ => None,
149            },
150            _ => None,
151        }
152    } else {
153        None
154    }
155}