datafusion_comet_spark_expr/predicate_funcs/
rlike.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::SparkError;
19use arrow::array::builder::BooleanBuilder;
20use arrow::array::types::Int32Type;
21use arrow::array::{Array, BooleanArray, DictionaryArray, RecordBatch, StringArray};
22use arrow::compute::take;
23use arrow::datatypes::{DataType, Schema};
24use datafusion::common::{internal_err, Result};
25use datafusion::physical_expr::PhysicalExpr;
26use datafusion::physical_expr_common::physical_expr::DynEq;
27use datafusion::physical_plan::ColumnarValue;
28use regex::Regex;
29use std::any::Any;
30use std::fmt::{Display, Formatter};
31use std::hash::{Hash, Hasher};
32use std::sync::Arc;
33
34/// Implementation of RLIKE operator.
35///
36/// Note that this implementation is not yet Spark-compatible and simply delegates to
37/// the Rust regexp crate. It will match Spark behavior for some simple cases but has
38/// differences in whitespace handling and does not support all the features of Java's
39/// regular expression engine, which are documented at:
40///
41/// https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html
42#[derive(Debug)]
43pub struct RLike {
44    child: Arc<dyn PhysicalExpr>,
45    // Only scalar patterns are supported
46    pattern_str: String,
47    pattern: Regex,
48}
49
50impl Hash for RLike {
51    fn hash<H: Hasher>(&self, state: &mut H) {
52        state.write(self.pattern_str.as_bytes());
53    }
54}
55
56impl DynEq for RLike {
57    fn dyn_eq(&self, other: &dyn Any) -> bool {
58        if let Some(other) = other.downcast_ref::<Self>() {
59            self.pattern_str == other.pattern_str
60        } else {
61            false
62        }
63    }
64}
65
66impl RLike {
67    pub fn try_new(child: Arc<dyn PhysicalExpr>, pattern: &str) -> Result<Self> {
68        Ok(Self {
69            child,
70            pattern_str: pattern.to_string(),
71            pattern: Regex::new(pattern).map_err(|e| {
72                SparkError::Internal(format!("Failed to compile pattern {pattern}: {e}"))
73            })?,
74        })
75    }
76
77    fn is_match(&self, inputs: &StringArray) -> BooleanArray {
78        let mut builder = BooleanBuilder::with_capacity(inputs.len());
79        if inputs.is_nullable() {
80            for i in 0..inputs.len() {
81                if inputs.is_null(i) {
82                    builder.append_null();
83                } else {
84                    builder.append_value(self.pattern.is_match(inputs.value(i)));
85                }
86            }
87        } else {
88            for i in 0..inputs.len() {
89                builder.append_value(self.pattern.is_match(inputs.value(i)));
90            }
91        }
92        builder.finish()
93    }
94}
95
96impl Display for RLike {
97    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
98        write!(
99            f,
100            "RLike [child: {}, pattern: {}] ",
101            self.child, self.pattern_str
102        )
103    }
104}
105
106impl PhysicalExpr for RLike {
107    fn as_any(&self) -> &dyn Any {
108        self
109    }
110
111    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
112        Ok(DataType::Boolean)
113    }
114
115    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
116        self.child.nullable(input_schema)
117    }
118
119    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
120        match self.child.evaluate(batch)? {
121            ColumnarValue::Array(array) if array.as_any().is::<DictionaryArray<Int32Type>>() => {
122                let dict_array = array
123                    .as_any()
124                    .downcast_ref::<DictionaryArray<Int32Type>>()
125                    .expect("dict array");
126                let dict_values = dict_array
127                    .values()
128                    .as_any()
129                    .downcast_ref::<StringArray>()
130                    .expect("strings");
131                // evaluate the regexp pattern against the dictionary values
132                let new_values = self.is_match(dict_values);
133                // convert to conventional (not dictionary-encoded) array
134                let result = take(&new_values, dict_array.keys(), None)?;
135                Ok(ColumnarValue::Array(result))
136            }
137            ColumnarValue::Array(array) => {
138                let inputs = array
139                    .as_any()
140                    .downcast_ref::<StringArray>()
141                    .expect("string array");
142                let array = self.is_match(inputs);
143                Ok(ColumnarValue::Array(Arc::new(array)))
144            }
145            ColumnarValue::Scalar(_) => {
146                internal_err!("non scalar regexp patterns are not supported")
147            }
148        }
149    }
150
151    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
152        vec![&self.child]
153    }
154
155    fn with_new_children(
156        self: Arc<Self>,
157        children: Vec<Arc<dyn PhysicalExpr>>,
158    ) -> Result<Arc<dyn PhysicalExpr>> {
159        assert!(children.len() == 1);
160        Ok(Arc::new(RLike::try_new(
161            Arc::clone(&children[0]),
162            &self.pattern_str,
163        )?))
164    }
165
166    fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
167        unimplemented!()
168    }
169}