use super::*;
pub(crate) fn column_collation(e: &Expr, ctx: &EvalContext<'_>) -> Option<spg_storage::Collation> {
let Expr::Column(c) = e else {
return None;
};
let matches_composite = |s: &str| {
c.qualifier.as_deref().is_some_and(|q| {
s.len() == q.len() + 1 + c.name.len()
&& s.as_bytes()[q.len()] == b'.'
&& s.starts_with(q)
&& s.ends_with(c.name.as_str())
})
};
if c.qualifier.is_some()
&& let Some(s) = ctx.columns.iter().find(|s| matches_composite(&s.name))
{
return Some(s.collation);
}
if let Some(s) = ctx.columns.iter().find(|s| s.name == c.name) {
return Some(s.collation);
}
let ends_with_dot_name = |s: &str| {
s.len() > c.name.len()
&& s.ends_with(c.name.as_str())
&& s.as_bytes()[s.len() - c.name.len() - 1] == b'.'
};
let mut matches = ctx.columns.iter().filter(|s| ends_with_dot_name(&s.name));
let first = matches.next();
let extra = matches.next();
match (first, extra) {
(Some(s), None) => Some(s.collation),
_ => None,
}
}
pub(super) fn collation_fold_for_compare(
op: BinOp,
lhs: &Expr,
rhs: &Expr,
l: Value,
r: Value,
ctx: &EvalContext<'_>,
) -> (Value, Value) {
if !matches!(
op,
BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
) {
return (l, r);
}
let lhs_col = column_collation(lhs, ctx);
let rhs_col = column_collation(rhs, ctx);
let ci = matches!(lhs_col, Some(spg_storage::Collation::CaseInsensitive))
|| matches!(rhs_col, Some(spg_storage::Collation::CaseInsensitive));
if !ci {
return (l, r);
}
let fold = |v: Value| match v {
Value::Text(s) => Value::Text(s.to_ascii_lowercase()),
other => other,
};
(fold(l), fold(r))
}
pub(super) fn eval_expr_cow<'r>(
expr: &Expr,
row: &'r Row,
ctx: &EvalContext<'_>,
) -> Result<Cow<'r, Value>, EvalError> {
match expr {
Expr::Column(c) => match resolve_column_borrowed(c, row, ctx)? {
Some(v) => Ok(Cow::Borrowed(v)),
None => resolve_column(c, row, ctx).map(Cow::Owned),
},
_ => eval_expr(expr, row, ctx).map(Cow::Owned),
}
}
#[inline]
pub(super) fn is_owned_compare_value(v: &Value) -> bool {
matches!(v, Value::Numeric { .. } | Value::Interval { .. })
}
#[inline]
pub(super) fn compare_is_case_insensitive(lhs: &Expr, rhs: &Expr, ctx: &EvalContext<'_>) -> bool {
matches!(
column_collation(lhs, ctx),
Some(spg_storage::Collation::CaseInsensitive)
) || matches!(
column_collation(rhs, ctx),
Some(spg_storage::Collation::CaseInsensitive)
)
}
#[inline]
pub(super) fn composite_eq(schema_name: &str, qualifier: &str, name: &str) -> bool {
schema_name.len() == qualifier.len() + 1 + name.len()
&& schema_name.as_bytes()[qualifier.len()] == b'.'
&& schema_name[..qualifier.len()] == *qualifier
&& schema_name[qualifier.len() + 1..] == *name
}
pub(crate) fn find_column_pos(c: &ColumnName, ctx: &EvalContext<'_>) -> Option<usize> {
if let Some(q) = &c.qualifier {
if let Some(pos) = ctx
.columns
.iter()
.position(|s| composite_eq(&s.name, q, &c.name))
{
return Some(pos);
}
}
ctx.columns.iter().position(|s| s.name == c.name)
}
pub(super) fn resolve_column_borrowed<'r>(
c: &ColumnName,
row: &'r Row,
ctx: &EvalContext<'_>,
) -> Result<Option<&'r Value>, EvalError> {
if let Some(q) = &c.qualifier {
if let Some(pos) = ctx
.columns
.iter()
.position(|s| composite_eq(&s.name, q, &c.name))
{
return Ok(row.values.get(pos));
}
}
if let Some(pos) = ctx.columns.iter().position(|s| s.name == c.name) {
return Ok(row.values.get(pos));
}
Ok(None)
}
pub(super) fn text_prefix_chars(t: &str, n: i64) -> String {
if n >= 0 {
let n = usize::try_from(n).unwrap_or(usize::MAX);
match t.char_indices().nth(n) {
Some((byte_idx, _)) => t[..byte_idx].into(),
None => t.into(),
}
} else {
let drop_tail = usize::try_from(-n).unwrap_or(usize::MAX);
let total = t.chars().count();
let keep = total.saturating_sub(drop_tail);
match t.char_indices().nth(keep) {
Some((byte_idx, _)) => t[..byte_idx].into(),
None => t.into(),
}
}
}
pub(super) fn resolve_column(
c: &ColumnName,
row: &Row,
ctx: &EvalContext<'_>,
) -> Result<Value, EvalError> {
if let Some(q) = &c.qualifier {
if let Some(pos) = ctx
.columns
.iter()
.position(|s| composite_eq(&s.name, q, &c.name))
{
return Ok(row.values[pos].clone());
}
let prefix = alloc::format!("{q}.");
if ctx.columns.iter().any(|sc| sc.name.starts_with(&prefix)) {
return Err(EvalError::ColumnNotFound {
name: alloc::format!("{q}.{name}", name = c.name),
});
}
let expected = ctx.table_alias.ok_or_else(|| EvalError::UnknownQualifier {
qualifier: q.clone(),
})?;
if q != expected {
return Err(EvalError::UnknownQualifier {
qualifier: q.clone(),
});
}
}
if let Some(pos) = ctx.columns.iter().position(|s| s.name == c.name) {
return Ok(row.values[pos].clone());
}
let suffix = alloc::format!(".{name}", name = c.name);
let mut matches = ctx
.columns
.iter()
.enumerate()
.filter(|(_, s)| s.name.ends_with(&suffix));
let first = matches.next();
let extra = matches.next();
match (first, extra) {
(Some((pos, _)), None) => Ok(row.values[pos].clone()),
(Some(_), Some(_)) => Err(EvalError::TypeMismatch {
detail: alloc::format!("ambiguous column reference: {}", c.name),
}),
_ => Err(EvalError::ColumnNotFound {
name: c.name.clone(),
}),
}
}