datafusion_comet_spark_expr/predicate_funcs/
rlike.rs1use crate::SparkError;
19use arrow::array::builder::BooleanBuilder;
20use arrow::array::types::Int32Type;
21use arrow::array::{Array, BooleanArray, DictionaryArray, RecordBatch, StringArray};
22use arrow::compute::take;
23use arrow::datatypes::{DataType, Schema};
24use datafusion::common::{internal_err, Result};
25use datafusion::physical_expr::PhysicalExpr;
26use datafusion::physical_expr_common::physical_expr::DynEq;
27use datafusion::physical_plan::ColumnarValue;
28use regex::Regex;
29use std::any::Any;
30use std::fmt::{Display, Formatter};
31use std::hash::{Hash, Hasher};
32use std::sync::Arc;
33
34#[derive(Debug)]
43pub struct RLike {
44 child: Arc<dyn PhysicalExpr>,
45 pattern_str: String,
47 pattern: Regex,
48}
49
50impl Hash for RLike {
51 fn hash<H: Hasher>(&self, state: &mut H) {
52 state.write(self.pattern_str.as_bytes());
53 }
54}
55
56impl DynEq for RLike {
57 fn dyn_eq(&self, other: &dyn Any) -> bool {
58 if let Some(other) = other.downcast_ref::<Self>() {
59 self.pattern_str == other.pattern_str
60 } else {
61 false
62 }
63 }
64}
65
66impl RLike {
67 pub fn try_new(child: Arc<dyn PhysicalExpr>, pattern: &str) -> Result<Self> {
68 Ok(Self {
69 child,
70 pattern_str: pattern.to_string(),
71 pattern: Regex::new(pattern).map_err(|e| {
72 SparkError::Internal(format!("Failed to compile pattern {pattern}: {e}"))
73 })?,
74 })
75 }
76
77 fn is_match(&self, inputs: &StringArray) -> BooleanArray {
78 let mut builder = BooleanBuilder::with_capacity(inputs.len());
79 if inputs.is_nullable() {
80 for i in 0..inputs.len() {
81 if inputs.is_null(i) {
82 builder.append_null();
83 } else {
84 builder.append_value(self.pattern.is_match(inputs.value(i)));
85 }
86 }
87 } else {
88 for i in 0..inputs.len() {
89 builder.append_value(self.pattern.is_match(inputs.value(i)));
90 }
91 }
92 builder.finish()
93 }
94}
95
96impl Display for RLike {
97 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
98 write!(
99 f,
100 "RLike [child: {}, pattern: {}] ",
101 self.child, self.pattern_str
102 )
103 }
104}
105
106impl PhysicalExpr for RLike {
107 fn as_any(&self) -> &dyn Any {
108 self
109 }
110
111 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
112 Ok(DataType::Boolean)
113 }
114
115 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
116 self.child.nullable(input_schema)
117 }
118
119 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
120 match self.child.evaluate(batch)? {
121 ColumnarValue::Array(array) if array.as_any().is::<DictionaryArray<Int32Type>>() => {
122 let dict_array = array
123 .as_any()
124 .downcast_ref::<DictionaryArray<Int32Type>>()
125 .expect("dict array");
126 let dict_values = dict_array
127 .values()
128 .as_any()
129 .downcast_ref::<StringArray>()
130 .expect("strings");
131 let new_values = self.is_match(dict_values);
133 let result = take(&new_values, dict_array.keys(), None)?;
135 Ok(ColumnarValue::Array(result))
136 }
137 ColumnarValue::Array(array) => {
138 let inputs = array
139 .as_any()
140 .downcast_ref::<StringArray>()
141 .expect("string array");
142 let array = self.is_match(inputs);
143 Ok(ColumnarValue::Array(Arc::new(array)))
144 }
145 ColumnarValue::Scalar(_) => {
146 internal_err!("non scalar regexp patterns are not supported")
147 }
148 }
149 }
150
151 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
152 vec![&self.child]
153 }
154
155 fn with_new_children(
156 self: Arc<Self>,
157 children: Vec<Arc<dyn PhysicalExpr>>,
158 ) -> Result<Arc<dyn PhysicalExpr>> {
159 assert!(children.len() == 1);
160 Ok(Arc::new(RLike::try_new(
161 Arc::clone(&children[0]),
162 &self.pattern_str,
163 )?))
164 }
165
166 fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
167 unimplemented!()
168 }
169}