datafusion_loki/
expr.rs

1use std::sync::LazyLock;
2
3use datafusion::{
4    logical_expr::{BinaryExpr, Like, Operator, ScalarUDFImpl, expr::ScalarFunction},
5    prelude::Expr,
6    scalar::ScalarValue,
7};
8
9use crate::{LABELS_FIELD_REF, LINE_FIELD_REF, MapGet, TIMESTAMP_FIELD_REF};
10
11static MAP_GET_FUNC: LazyLock<MapGet> = LazyLock::new(MapGet::new);
12
13pub fn expr_to_label_filter(expr: &Expr) -> Option<String> {
14    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
15        let Expr::ScalarFunction(ScalarFunction { func, args }) = left.as_ref() else {
16            return None;
17        };
18        if func.name() != MAP_GET_FUNC.name() {
19            return None;
20        }
21        if args.len() != 2 {
22            return None;
23        }
24        let label = match (&args[0], &args[1]) {
25            (Expr::Column(col), Expr::Literal(ScalarValue::Utf8(value), _))
26                if col.name() == LABELS_FIELD_REF.name() =>
27            {
28                value.as_ref()
29            }
30            _ => None,
31        }?;
32
33        let Expr::Literal(ScalarValue::Utf8(value), _) = right.as_ref() else {
34            return None;
35        };
36        let empty_string = String::new();
37        let value = value.as_ref().unwrap_or(&empty_string);
38
39        match op {
40            Operator::Eq => Some(format!("{label}=\"{value}\"")),
41            Operator::NotEq => Some(format!("{label}!=\"{value}\"")),
42            Operator::RegexMatch => Some(format!("{label}=~\"{value}\"")),
43            Operator::RegexNotMatch => Some(format!("{label}!~\"{value}\"")),
44            _ => None,
45        }
46    } else {
47        None
48    }
49}
50
51pub fn expr_to_line_filter(expr: &Expr) -> Option<String> {
52    let cols = expr.column_refs();
53    if cols.len() != 1 {
54        return None;
55    }
56    let col = cols.iter().next()?;
57    if col.name() != LINE_FIELD_REF.name() {
58        return None;
59    }
60
61    let empty_string = String::new();
62
63    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
64        match op {
65            Operator::RegexMatch => match (left.as_ref(), right.as_ref()) {
66                (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
67                    Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
68                }
69                (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
70                    Some(format!("|~`{}`", value.as_ref().unwrap_or(&empty_string)))
71                }
72                _ => None,
73            },
74            Operator::RegexNotMatch => match (left.as_ref(), right.as_ref()) {
75                (Expr::Column(_), Expr::Literal(ScalarValue::Utf8(value), _)) => {
76                    Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
77                }
78                (Expr::Literal(ScalarValue::Utf8(value), _), Expr::Column(_)) => {
79                    Some(format!("!~`{}`", value.as_ref().unwrap_or(&empty_string)))
80                }
81                _ => None,
82            },
83            _ => None,
84        }
85    } else if let Expr::Like(Like {
86        negated,
87        expr,
88        pattern,
89        case_insensitive,
90        ..
91    }) = expr
92    {
93        let Expr::Column(_) = expr.as_ref() else {
94            return None;
95        };
96        let Expr::Literal(ScalarValue::Utf8(value), _) = pattern.as_ref() else {
97            return None;
98        };
99        let value = value.as_ref().unwrap_or(&empty_string);
100        if value.starts_with("%") && value.ends_with("%") && !value.contains("_") {
101            let value = value.trim_start_matches('%').trim_end_matches('%');
102            match (negated, case_insensitive) {
103                (true, true) => Some(format!("!~ `(?i){}`", value)),
104                (true, false) => Some(format!("!= `{}`", value)),
105                (false, true) => Some(format!("|~ `(?i){}`", value)),
106                (false, false) => Some(format!("|= `{}`", value)),
107            }
108        } else {
109            None
110        }
111    } else {
112        None
113    }
114}
115
116pub enum TimestampBound {
117    Start(Option<i64>),
118    End(Option<i64>),
119}
120pub fn parse_timestamp_bound(expr: &Expr) -> Option<TimestampBound> {
121    let cols = expr.column_refs();
122    if cols.len() != 1 {
123        return None;
124    }
125    let col = cols.iter().next()?;
126    if col.name() != TIMESTAMP_FIELD_REF.name() {
127        return None;
128    }
129    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
130        match op {
131            Operator::Lt | Operator::LtEq => match (left.as_ref(), right.as_ref()) {
132                (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
133                    Some(TimestampBound::End(*value))
134                }
135                (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
136                    Some(TimestampBound::Start(*value))
137                }
138                _ => None,
139            },
140            Operator::Gt | Operator::GtEq => match (left.as_ref(), right.as_ref()) {
141                (Expr::Column(_), Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _)) => {
142                    Some(TimestampBound::Start(*value))
143                }
144                (Expr::Literal(ScalarValue::TimestampNanosecond(value, _), _), Expr::Column(_)) => {
145                    Some(TimestampBound::End(*value))
146                }
147                _ => None,
148            },
149            _ => None,
150        }
151    } else {
152        None
153    }
154}