use std::any::Any;
use std::cell::RefCell;
use std::fmt;
use std::num::NonZeroUsize;
use lru::LruCache;
use regex::Regex;
use rustc_hash::FxHashMap;
use super::{find_column_index, resolve_alias, Expression};
use crate::core::{Result, Row, Schema};
const LIKE_REGEX_CACHE_SIZE: usize = 128;
thread_local! {
static LIKE_REGEX_CACHE: RefCell<LruCache<String, Regex>> =
RefCell::new(LruCache::new(NonZeroUsize::new(LIKE_REGEX_CACHE_SIZE).unwrap()));
}
pub fn clear_like_regex_cache() {
LIKE_REGEX_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
fn get_or_compile_like_regex(pattern: &str) -> Option<Regex> {
LIKE_REGEX_CACHE.with(|cache| {
let mut cache = cache.borrow_mut();
if let Some(regex) = cache.get(pattern) {
return Some(regex.clone());
}
match Regex::new(pattern) {
Ok(regex) => {
cache.put(pattern.to_string(), regex.clone());
Some(regex)
}
Err(_) => None,
}
})
}
pub struct LikeExpr {
column: String,
pattern: String,
case_insensitive: bool,
negated: bool,
col_index: Option<usize>,
regex: Option<Regex>,
prepared: bool,
}
impl LikeExpr {
pub fn new(column: impl Into<String>, pattern: impl Into<String>) -> Self {
let pattern_str = pattern.into();
let regex = Self::compile_pattern(&pattern_str, false);
Self {
column: column.into(),
pattern: pattern_str,
case_insensitive: false,
negated: false,
col_index: None,
regex,
prepared: false,
}
}
pub fn new_ilike(column: impl Into<String>, pattern: impl Into<String>) -> Self {
let pattern_str = pattern.into();
let regex = Self::compile_pattern(&pattern_str, true);
Self {
column: column.into(),
pattern: pattern_str,
case_insensitive: true,
negated: false,
col_index: None,
regex,
prepared: false,
}
}
pub fn not_like(column: impl Into<String>, pattern: impl Into<String>) -> Self {
let mut expr = Self::new(column, pattern);
expr.negated = true;
expr
}
pub fn not_ilike(column: impl Into<String>, pattern: impl Into<String>) -> Self {
let mut expr = Self::new_ilike(column, pattern);
expr.negated = true;
expr
}
pub fn compile_pattern(pattern: &str, case_insensitive: bool) -> Option<Regex> {
let mut regex_pattern = String::with_capacity(pattern.len() * 2);
regex_pattern.push('^');
let mut chars = pattern.chars().peekable();
while let Some(c) = chars.next() {
match c {
'%' => regex_pattern.push_str(".*"),
'_' => regex_pattern.push('.'),
'\\' => {
if let Some(&next) = chars.peek() {
if next == '%' || next == '_' || next == '\\' {
regex_pattern.push_str(®ex::escape(&next.to_string()));
chars.next();
} else {
regex_pattern.push_str("\\\\");
}
} else {
regex_pattern.push_str("\\\\");
}
}
'.' | '+' | '*' | '?' | '^' | '$' | '(' | ')' | '[' | ']' | '{' | '}' | '|' => {
regex_pattern.push('\\');
regex_pattern.push(c);
}
_ => regex_pattern.push(c),
}
}
regex_pattern.push('$');
let regex_str = if case_insensitive {
format!("(?i){}", regex_pattern)
} else {
regex_pattern
};
get_or_compile_like_regex(®ex_str)
}
fn matches(&self, value: &str) -> bool {
if let Some(ref regex) = self.regex {
regex.is_match(value)
} else {
false
}
}
pub fn get_pattern(&self) -> &str {
&self.pattern
}
pub fn is_case_insensitive(&self) -> bool {
self.case_insensitive
}
pub fn is_negated(&self) -> bool {
self.negated
}
}
impl fmt::Debug for LikeExpr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.negated {
if self.case_insensitive {
write!(f, "{} NOT ILIKE '{}'", self.column, self.pattern)
} else {
write!(f, "{} NOT LIKE '{}'", self.column, self.pattern)
}
} else if self.case_insensitive {
write!(f, "{} ILIKE '{}'", self.column, self.pattern)
} else {
write!(f, "{} LIKE '{}'", self.column, self.pattern)
}
}
}
impl Expression for LikeExpr {
fn evaluate(&self, row: &Row) -> Result<bool> {
let value = if let Some(idx) = self.col_index {
row.get(idx)
} else {
None
};
let value = match value {
Some(v) => v,
None => return Ok(false),
};
if value.is_null() {
return Ok(false);
}
let str_value: std::borrow::Cow<'_, str> = match value.as_str() {
Some(s) => std::borrow::Cow::Borrowed(s),
None => std::borrow::Cow::Owned(value.to_string()),
};
let matched = self.matches(&str_value);
Ok(if self.negated { !matched } else { matched })
}
fn evaluate_fast(&self, row: &Row) -> bool {
let idx = match self.col_index {
Some(i) => i,
None => return false,
};
let value = match row.get(idx) {
Some(v) => v,
None => return false,
};
if value.is_null() {
return false;
}
let str_value: std::borrow::Cow<'_, str> = match value.as_str() {
Some(s) => std::borrow::Cow::Borrowed(s),
None => std::borrow::Cow::Owned(value.to_string()),
};
let matched = self.matches(&str_value);
if self.negated {
!matched
} else {
matched
}
}
fn with_aliases(&self, aliases: &FxHashMap<String, String>) -> Box<dyn Expression> {
let resolved = resolve_alias(&self.column, aliases);
let mut expr = LikeExpr {
column: resolved.to_string(),
pattern: self.pattern.clone(),
case_insensitive: self.case_insensitive,
negated: self.negated,
col_index: None,
regex: self.regex.clone(),
prepared: false,
};
expr.regex = Self::compile_pattern(&self.pattern, self.case_insensitive);
Box::new(expr)
}
fn prepare_for_schema(&mut self, schema: &Schema) {
self.col_index = find_column_index(schema, &self.column);
self.prepared = true;
}
fn collect_column_indices(&self, out: &mut Vec<usize>) -> bool {
if let Some(idx) = self.col_index {
out.push(idx);
true
} else {
false
}
}
fn is_prepared(&self) -> bool {
self.prepared
}
fn get_column_name(&self) -> Option<&str> {
Some(&self.column)
}
fn can_use_index(&self) -> bool {
!self.pattern.starts_with('%')
}
fn get_like_prefix_info(&self) -> Option<(&str, String, bool)> {
if self.case_insensitive || self.pattern.starts_with('%') {
return None;
}
let prefix: String = self
.pattern
.chars()
.take_while(|&c| c != '%' && c != '_')
.collect();
if prefix.is_empty() {
return None;
}
Some((&self.column, prefix, self.negated))
}
fn clone_box(&self) -> Box<dyn Expression> {
Box::new(LikeExpr {
column: self.column.clone(),
pattern: self.pattern.clone(),
case_insensitive: self.case_insensitive,
negated: self.negated,
col_index: self.col_index,
regex: self.regex.clone(),
prepared: self.prepared,
})
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{DataType, Row, SchemaBuilder, Value};
fn test_schema() -> Schema {
SchemaBuilder::new("test")
.add_primary_key("id", DataType::Integer)
.add("name", DataType::Text)
.add_nullable("email", DataType::Text)
.build()
}
#[test]
fn test_like_starts_with() {
let schema = test_schema();
let mut expr = LikeExpr::new("name", "John%");
expr.prepare_for_schema(&schema);
let row1 = Row::from(vec![
Value::Integer(1),
Value::text("John"),
Value::null_unknown(),
]);
let row2 = Row::from(vec![
Value::Integer(2),
Value::text("Johnny"),
Value::null_unknown(),
]);
let row3 = Row::from(vec![
Value::Integer(3),
Value::text("Jane"),
Value::null_unknown(),
]);
assert!(expr.evaluate(&row1).unwrap());
assert!(expr.evaluate(&row2).unwrap());
assert!(!expr.evaluate(&row3).unwrap());
}
#[test]
fn test_like_ends_with() {
let schema = test_schema();
let mut expr = LikeExpr::new("name", "%son");
expr.prepare_for_schema(&schema);
let row1 = Row::from(vec![
Value::Integer(1),
Value::text("Johnson"),
Value::null_unknown(),
]);
let row2 = Row::from(vec![
Value::Integer(2),
Value::text("Jason"),
Value::null_unknown(),
]);
let row3 = Row::from(vec![
Value::Integer(3),
Value::text("John"),
Value::null_unknown(),
]);
assert!(expr.evaluate(&row1).unwrap());
assert!(expr.evaluate(&row2).unwrap());
assert!(!expr.evaluate(&row3).unwrap());
}
#[test]
fn test_like_contains() {
let schema = test_schema();
let mut expr = LikeExpr::new("name", "%oh%");
expr.prepare_for_schema(&schema);
let row1 = Row::from(vec![
Value::Integer(1),
Value::text("John"),
Value::null_unknown(),
]);
let row2 = Row::from(vec![
Value::Integer(2),
Value::text("Mohawk"),
Value::null_unknown(),
]);
let row3 = Row::from(vec![
Value::Integer(3),
Value::text("Jane"),
Value::null_unknown(),
]);
assert!(expr.evaluate(&row1).unwrap());
assert!(expr.evaluate(&row2).unwrap());
assert!(!expr.evaluate(&row3).unwrap());
}
#[test]
fn test_like_single_char() {
let schema = test_schema();
let mut expr = LikeExpr::new("name", "J_n");
expr.prepare_for_schema(&schema);
let row1 = Row::from(vec![
Value::Integer(1),
Value::text("Jon"),
Value::null_unknown(),
]);
let row2 = Row::from(vec![
Value::Integer(2),
Value::text("Jan"),
Value::null_unknown(),
]);
let row3 = Row::from(vec![
Value::Integer(3),
Value::text("John"),
Value::null_unknown(),
]);
assert!(expr.evaluate(&row1).unwrap());
assert!(expr.evaluate(&row2).unwrap());
assert!(!expr.evaluate(&row3).unwrap()); }
#[test]
fn test_ilike_case_insensitive() {
let schema = test_schema();
let mut expr = LikeExpr::new_ilike("name", "JOHN%");
expr.prepare_for_schema(&schema);
let row1 = Row::from(vec![
Value::Integer(1),
Value::text("john"),
Value::null_unknown(),
]);
let row2 = Row::from(vec![
Value::Integer(2),
Value::text("JOHN"),
Value::null_unknown(),
]);
let row3 = Row::from(vec![
Value::Integer(3),
Value::text("JoHn"),
Value::null_unknown(),
]);
assert!(expr.evaluate(&row1).unwrap());
assert!(expr.evaluate(&row2).unwrap());
assert!(expr.evaluate(&row3).unwrap());
}
#[test]
fn test_not_like() {
let schema = test_schema();
let mut expr = LikeExpr::not_like("name", "John%");
expr.prepare_for_schema(&schema);
let row1 = Row::from(vec![
Value::Integer(1),
Value::text("John"),
Value::null_unknown(),
]);
let row2 = Row::from(vec![
Value::Integer(2),
Value::text("Jane"),
Value::null_unknown(),
]);
assert!(!expr.evaluate(&row1).unwrap());
assert!(expr.evaluate(&row2).unwrap());
}
#[test]
fn test_like_null() {
let schema = test_schema();
let mut expr = LikeExpr::new("name", "John%");
expr.prepare_for_schema(&schema);
let row = Row::from(vec![
Value::Integer(1),
Value::null_unknown(),
Value::null_unknown(),
]);
assert!(!expr.evaluate(&row).unwrap());
}
#[test]
fn test_like_exact_match() {
let schema = test_schema();
let mut expr = LikeExpr::new("name", "John");
expr.prepare_for_schema(&schema);
let row1 = Row::from(vec![
Value::Integer(1),
Value::text("John"),
Value::null_unknown(),
]);
let row2 = Row::from(vec![
Value::Integer(2),
Value::text("Johnny"),
Value::null_unknown(),
]);
assert!(expr.evaluate(&row1).unwrap());
assert!(!expr.evaluate(&row2).unwrap());
}
#[test]
fn test_like_special_chars() {
let schema = test_schema();
let mut expr = LikeExpr::new("name", "test.name%");
expr.prepare_for_schema(&schema);
let row1 = Row::from(vec![
Value::Integer(1),
Value::text("test.name123"),
Value::null_unknown(),
]);
let row2 = Row::from(vec![
Value::Integer(2),
Value::text("testXname123"),
Value::null_unknown(),
]);
assert!(expr.evaluate(&row1).unwrap());
assert!(!expr.evaluate(&row2).unwrap()); }
#[test]
fn test_can_use_index() {
let expr1 = LikeExpr::new("name", "John%");
assert!(expr1.can_use_index());
let expr2 = LikeExpr::new("name", "%John");
assert!(!expr2.can_use_index());
let expr3 = LikeExpr::new("name", "%John%");
assert!(!expr3.can_use_index());
}
}