use std::fmt;
#[derive(Debug, Clone, PartialEq)]
pub enum ScalarValue {
Null,
Bool(bool),
Int64(i64),
Float64(f64),
Utf8(String),
Binary(Vec<u8>),
Timestamp(i64),
}
impl fmt::Display for ScalarValue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Null => write!(f, "NULL"),
Self::Bool(v) => write!(f, "{v}"),
Self::Int64(v) => write!(f, "{v}"),
Self::Float64(v) => write!(f, "{v}"),
Self::Utf8(v) => {
write!(f, "'{}'", v.replace('\'', "''"))
}
Self::Binary(v) => write!(f, "X'{}'", hex_encode(v)),
Self::Timestamp(us) => write!(f, "TIMESTAMP '{us}'"),
}
}
}
fn hex_encode(bytes: &[u8]) -> String {
use std::fmt::Write;
bytes
.iter()
.fold(String::with_capacity(bytes.len() * 2), |mut s, b| {
let _ = write!(s, "{b:02x}");
s
})
}
#[derive(Debug, Clone, PartialEq)]
pub enum Predicate {
Eq {
column: String,
value: ScalarValue,
},
NotEq {
column: String,
value: ScalarValue,
},
Lt {
column: String,
value: ScalarValue,
},
LtEq {
column: String,
value: ScalarValue,
},
Gt {
column: String,
value: ScalarValue,
},
GtEq {
column: String,
value: ScalarValue,
},
In {
column: String,
values: Vec<ScalarValue>,
},
IsNull {
column: String,
},
IsNotNull {
column: String,
},
}
impl Predicate {
#[must_use]
pub fn column(&self) -> &str {
match self {
Self::Eq { column, .. }
| Self::NotEq { column, .. }
| Self::Lt { column, .. }
| Self::LtEq { column, .. }
| Self::Gt { column, .. }
| Self::GtEq { column, .. }
| Self::In { column, .. }
| Self::IsNull { column }
| Self::IsNotNull { column } => column,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct SourceCapabilities {
pub eq_columns: Vec<String>,
pub range_columns: Vec<String>,
pub in_columns: Vec<String>,
pub supports_null_check: bool,
}
#[derive(Debug, Clone)]
pub struct SplitPredicates {
pub pushable: Vec<Predicate>,
pub local: Vec<Predicate>,
}
#[must_use]
pub fn split_predicates(
predicates: Vec<Predicate>,
capabilities: &SourceCapabilities,
) -> SplitPredicates {
let mut pushable = Vec::new();
let mut local = Vec::new();
for pred in predicates {
let can_push = match &pred {
Predicate::Eq { column, .. } => capabilities.eq_columns.iter().any(|c| c == column),
Predicate::NotEq { .. } => false,
Predicate::Lt { column, .. }
| Predicate::LtEq { column, .. }
| Predicate::Gt { column, .. }
| Predicate::GtEq { column, .. } => {
capabilities.range_columns.iter().any(|c| c == column)
}
Predicate::In { column, .. } => capabilities.in_columns.iter().any(|c| c == column),
Predicate::IsNull { .. } | Predicate::IsNotNull { .. } => {
capabilities.supports_null_check
}
};
if can_push {
pushable.push(pred);
} else {
local.push(pred);
}
}
SplitPredicates { pushable, local }
}
#[must_use]
pub fn predicate_to_sql(predicate: &Predicate) -> String {
let q = |col: &str| col.replace('"', "\"\"");
match predicate {
Predicate::Eq { column, value } => format!("\"{}\" = {value}", q(column)),
Predicate::NotEq { column, value } => format!("\"{}\" != {value}", q(column)),
Predicate::Lt { column, value } => format!("\"{}\" < {value}", q(column)),
Predicate::LtEq { column, value } => format!("\"{}\" <= {value}", q(column)),
Predicate::Gt { column, value } => format!("\"{}\" > {value}", q(column)),
Predicate::GtEq { column, value } => format!("\"{}\" >= {value}", q(column)),
Predicate::In { column, values } => {
let vals: Vec<String> = values.iter().map(ToString::to_string).collect();
format!("\"{}\" IN ({})", q(column), vals.join(", "))
}
Predicate::IsNull { column } => format!("\"{}\" IS NULL", q(column)),
Predicate::IsNotNull { column } => format!("\"{}\" IS NOT NULL", q(column)),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_scalar_value_display() {
assert_eq!(ScalarValue::Null.to_string(), "NULL");
assert_eq!(ScalarValue::Bool(true).to_string(), "true");
assert_eq!(ScalarValue::Int64(42).to_string(), "42");
assert_eq!(ScalarValue::Float64(1.23).to_string(), "1.23");
assert_eq!(ScalarValue::Utf8("hello".into()).to_string(), "'hello'");
assert_eq!(ScalarValue::Binary(vec![0xDE, 0xAD]).to_string(), "X'dead'");
}
#[test]
fn test_predicate_column() {
let pred = Predicate::Eq {
column: "id".into(),
value: ScalarValue::Int64(1),
};
assert_eq!(pred.column(), "id");
let pred = Predicate::IsNull {
column: "name".into(),
};
assert_eq!(pred.column(), "name");
}
#[test]
fn test_predicate_to_sql() {
assert_eq!(
predicate_to_sql(&Predicate::Eq {
column: "id".into(),
value: ScalarValue::Int64(42),
}),
"\"id\" = 42"
);
assert_eq!(
predicate_to_sql(&Predicate::In {
column: "status".into(),
values: vec![
ScalarValue::Utf8("active".into()),
ScalarValue::Utf8("pending".into()),
],
}),
"\"status\" IN ('active', 'pending')"
);
assert_eq!(
predicate_to_sql(&Predicate::Gt {
column: "order".into(),
value: ScalarValue::Int64(10),
}),
"\"order\" > 10"
);
assert_eq!(
predicate_to_sql(&Predicate::IsNull {
column: "deleted_at".into(),
}),
"\"deleted_at\" IS NULL"
);
}
#[test]
fn test_split_predicates() {
let capabilities = SourceCapabilities {
eq_columns: vec!["id".into(), "name".into()],
range_columns: vec!["created_at".into()],
in_columns: vec!["status".into()],
supports_null_check: false,
};
let predicates = vec![
Predicate::Eq {
column: "id".into(),
value: ScalarValue::Int64(1),
},
Predicate::Gt {
column: "created_at".into(),
value: ScalarValue::Timestamp(1_000_000),
},
Predicate::IsNull {
column: "deleted_at".into(),
},
Predicate::In {
column: "status".into(),
values: vec![ScalarValue::Utf8("active".into())],
},
Predicate::Eq {
column: "region".into(),
value: ScalarValue::Utf8("us-east".into()),
},
];
let split = split_predicates(predicates, &capabilities);
assert_eq!(split.pushable.len(), 3); assert_eq!(split.local.len(), 2); }
#[test]
fn test_scalar_value_display_escapes_single_quotes() {
assert_eq!(
ScalarValue::Utf8("O'Brien".into()).to_string(),
"'O''Brien'"
);
assert_eq!(
ScalarValue::Utf8(r#"say "hello""#.into()).to_string(),
r#"'say "hello"'"#
);
assert_eq!(ScalarValue::Utf8("it''s".into()).to_string(), "'it''''s'");
assert_eq!(ScalarValue::Utf8(String::new()).to_string(), "''");
}
#[test]
fn test_not_eq_never_pushed_down() {
let capabilities = SourceCapabilities {
eq_columns: vec!["id".into()],
range_columns: vec![],
in_columns: vec![],
supports_null_check: false,
};
let predicates = vec![
Predicate::Eq {
column: "id".into(),
value: ScalarValue::Int64(1),
},
Predicate::NotEq {
column: "id".into(),
value: ScalarValue::Int64(2),
},
];
let split = split_predicates(predicates, &capabilities);
assert_eq!(split.pushable.len(), 1);
assert!(matches!(&split.pushable[0], Predicate::Eq { .. }));
assert_eq!(split.local.len(), 1);
assert!(matches!(&split.local[0], Predicate::NotEq { .. }));
}
#[test]
fn test_split_predicates_empty_capabilities() {
let capabilities = SourceCapabilities::default();
let predicates = vec![Predicate::Eq {
column: "id".into(),
value: ScalarValue::Int64(1),
}];
let split = split_predicates(predicates, &capabilities);
assert!(split.pushable.is_empty());
assert_eq!(split.local.len(), 1);
}
}