use crate::catalog::{TableCatalog, TableMetadataView};
use crate::types::TableId;
use llkv_column_map::store::ROW_ID_COLUMN_NAME;
use llkv_expr::expr::ScalarExpr;
use llkv_result::{Error, Result};
use rustc_hash::FxHashSet;
#[derive(Clone, Debug, Default)]
pub struct IdentifierContext {
default_table_id: Option<TableId>,
table_alias: Option<String>,
available_columns: Option<FxHashSet<String>>,
}
impl IdentifierContext {
pub fn new(default_table_id: Option<TableId>) -> Self {
Self {
default_table_id,
table_alias: None,
available_columns: None,
}
}
pub fn with_table_alias(mut self, alias: Option<String>) -> Self {
self.table_alias = alias.map(|value| value.to_ascii_lowercase());
self
}
pub fn with_available_columns(mut self, columns: FxHashSet<String>) -> Self {
self.available_columns = Some(columns);
self
}
pub fn default_table_id(&self) -> Option<TableId> {
self.default_table_id
}
pub fn table_alias(&self) -> Option<&str> {
self.table_alias.as_deref()
}
pub fn tracks_available_columns(&self) -> bool {
self.available_columns.is_some()
}
pub fn has_available_column_for(&self, parts: &[String]) -> bool {
let Some(columns) = &self.available_columns else {
return false;
};
let Some(last) = parts.last() else {
return false;
};
columns.contains(&last.to_ascii_lowercase())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ColumnResolution {
column: String,
field_path: Vec<String>,
}
impl ColumnResolution {
pub fn column(&self) -> &str {
&self.column
}
pub fn with_column(mut self, column: String) -> Self {
self.column = column;
self
}
pub fn field_path(&self) -> &[String] {
&self.field_path
}
pub fn is_simple(&self) -> bool {
self.field_path.is_empty()
}
pub fn into_scalar_expr(self) -> ScalarExpr<String> {
let mut expr = ScalarExpr::column(self.column);
for field in self.field_path {
expr = ScalarExpr::get_field(expr, field);
}
expr
}
}
pub struct IdentifierResolver<'a> {
catalog: &'a TableCatalog,
}
impl<'a> IdentifierResolver<'a> {
pub fn resolve(
&self,
parts: &[String],
context: IdentifierContext,
) -> Result<ColumnResolution> {
if parts.is_empty() {
return Err(Error::InvalidArgumentError(
"invalid compound identifier".into(),
));
}
if context.default_table_id().is_none() {
return Ok(ColumnResolution {
column: parts.join("."),
field_path: Vec::new(),
});
}
let table_id = context.default_table_id().unwrap();
let table_meta = self.catalog.table_metadata_view(table_id).ok_or_else(|| {
Error::CatalogError(format!(
"Catalog Error: table id {} is not registered",
table_id
))
})?;
let mut effective_parts = parts;
if let Some(alias) = context.table_alias() {
if !effective_parts.is_empty() && effective_parts[0].eq_ignore_ascii_case(alias) {
if effective_parts.len() == 1 {
return Err(Error::InvalidArgumentError(format!(
"Binder Error: does not have a column named '{}'",
effective_parts[0]
)));
}
effective_parts = &effective_parts[1..];
} else if !effective_parts.is_empty() {
let canonical_table = table_meta.canonical_table();
if effective_parts[0].eq_ignore_ascii_case(canonical_table) {
return Err(Error::InvalidArgumentError(format!(
"Binder Error: table '{}' not found",
effective_parts[0]
)));
}
if effective_parts.len() >= 2
&& let Some(schema) = table_meta.canonical_schema()
&& effective_parts[0].eq_ignore_ascii_case(schema)
&& effective_parts[1].eq_ignore_ascii_case(canonical_table)
{
return Err(Error::InvalidArgumentError(format!(
"Binder Error: table '{}' not found",
effective_parts[1]
)));
}
}
}
let alias_active = context.table_alias().is_some();
let resolution = resolve_identifier_parts(effective_parts, &table_meta, alias_active)?;
if let Some(field_resolver) = self.catalog.field_resolver(table_id) {
if field_resolver.field_exists(resolution.column()) {
return Ok(resolution);
}
if resolution.field_path().is_empty()
&& let Some(canonical) = canonicalize_rowid_alias(resolution.column())
{
return Ok(resolution.with_column(canonical.to_string()));
}
return Err(Error::InvalidArgumentError(format!(
"Binder Error: does not have a column named '{}'",
resolution.column()
)));
}
Ok(resolution)
}
}
fn resolve_identifier_parts(
parts: &[String],
table_meta: &TableMetadataView,
alias_active: bool,
) -> Result<ColumnResolution> {
let canonical_table = table_meta.canonical_table().to_string();
let canonical_schema = table_meta.canonical_schema().map(str::to_string);
let canonical_parts: Vec<String> = parts.iter().map(|part| part.to_ascii_lowercase()).collect();
let mut column_start_idx = 0usize;
if !alias_active {
if let Some(schema) = canonical_schema.as_deref() {
let schema_then_table = canonical_parts.len() == 2
&& canonical_parts[0] == schema
&& canonical_parts[1] == canonical_table;
let starts_with_table =
canonical_parts.len() >= 2 && canonical_parts[0] == canonical_table;
if canonical_parts.len() >= 3
&& canonical_parts[0] == schema
&& canonical_parts[1] == canonical_table
{
column_start_idx = 2;
} else if schema_then_table || starts_with_table {
column_start_idx = 1;
} else if canonical_parts.len() >= 3 && canonical_parts[1] == canonical_table {
return Err(Error::InvalidArgumentError(format!(
"Binder Error: table '{}' not found",
parts[0]
)));
} else if canonical_parts.len() >= 3 && canonical_parts[0] == schema {
return Err(Error::InvalidArgumentError(format!(
"Binder Error: table '{}' not found",
parts[1]
)));
}
} else if canonical_parts.len() >= 2 && canonical_parts[0] == canonical_table {
column_start_idx = 1;
}
}
if column_start_idx >= parts.len() {
return Err(Error::InvalidArgumentError(format!(
"invalid column reference in identifier: {}",
parts.join(".")
)));
}
let column = parts[column_start_idx].clone();
let mut field_path = Vec::new();
if column_start_idx + 1 < parts.len() {
field_path.extend(parts[(column_start_idx + 1)..].iter().cloned());
}
Ok(ColumnResolution { column, field_path })
}
impl TableCatalog {
pub fn identifier_resolver(&self) -> IdentifierResolver<'_> {
IdentifierResolver { catalog: self }
}
}
pub fn canonicalize_rowid_alias(name: &str) -> Option<&'static str> {
if name.eq_ignore_ascii_case(ROW_ID_COLUMN_NAME) {
Some(ROW_ID_COLUMN_NAME)
} else {
None
}
}