Skip to main content

nodedb_sql/resolver/
columns.rs

1//! Column and table resolution against the catalog.
2
3use std::collections::HashMap;
4
5use crate::error::{Result, SqlError};
6use crate::parser::normalize::table_name_from_factor;
7use crate::types::{CollectionInfo, EngineType, SqlCatalog};
8
9/// Resolved table reference: name, alias, and catalog info.
10#[derive(Debug, Clone)]
11pub struct ResolvedTable {
12    pub name: String,
13    pub alias: Option<String>,
14    pub info: CollectionInfo,
15}
16
17impl ResolvedTable {
18    /// The name to use for qualified column references.
19    pub fn ref_name(&self) -> &str {
20        self.alias.as_deref().unwrap_or(&self.name)
21    }
22}
23
24/// Context built during FROM clause resolution.
25#[derive(Debug, Default)]
26pub struct TableScope {
27    /// Tables by reference name (alias or table name).
28    pub tables: HashMap<String, ResolvedTable>,
29    /// Insertion order for unambiguous column resolution.
30    order: Vec<String>,
31}
32
33impl TableScope {
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    /// Add a resolved table. Returns error if name conflicts.
39    pub fn add(&mut self, table: ResolvedTable) -> Result<()> {
40        let key = table.ref_name().to_string();
41        if self.tables.contains_key(&key) {
42            return Err(SqlError::Parse {
43                detail: format!("duplicate table reference: {key}"),
44            });
45        }
46        self.order.push(key.clone());
47        self.tables.insert(key, table);
48        Ok(())
49    }
50
51    /// Resolve a column name, optionally qualified with a table reference.
52    ///
53    /// For schemaless collections, any column is accepted (dynamic fields).
54    /// For typed collections, the column must exist in the schema.
55    pub fn resolve_column(
56        &self,
57        table_ref: Option<&str>,
58        column: &str,
59    ) -> Result<(String, String)> {
60        let col = column.to_lowercase();
61
62        if let Some(tref) = table_ref {
63            let tref_lower = tref.to_lowercase();
64            let table = self
65                .tables
66                .get(&tref_lower)
67                .ok_or_else(|| SqlError::UnknownTable {
68                    name: tref_lower.clone(),
69                })?;
70            self.validate_column(table, &col)?;
71            return Ok((table.name.clone(), col));
72        }
73
74        // Unqualified: search all tables.
75        let mut matches = Vec::new();
76        for key in &self.order {
77            let table = &self.tables[key];
78            if self.column_exists(table, &col) {
79                matches.push(table.name.clone());
80            }
81        }
82
83        match matches.len() {
84            0 => {
85                // For single-table queries with schemaless, accept anything.
86                if self.tables.len() == 1 {
87                    let table = self.tables.values().next().unwrap();
88                    if table.info.engine == EngineType::DocumentSchemaless {
89                        return Ok((table.name.clone(), col));
90                    }
91                }
92                Err(SqlError::UnknownColumn {
93                    table: self
94                        .order
95                        .first()
96                        .cloned()
97                        .unwrap_or_else(|| "<unknown>".into()),
98                    column: col,
99                })
100            }
101            1 => Ok((matches.into_iter().next().unwrap(), col)),
102            _ => Err(SqlError::AmbiguousColumn { column: col }),
103        }
104    }
105
106    fn column_exists(&self, table: &ResolvedTable, column: &str) -> bool {
107        // Schemaless accepts any column.
108        if table.info.engine == EngineType::DocumentSchemaless {
109            return true;
110        }
111        table.info.columns.iter().any(|c| c.name == column)
112    }
113
114    fn validate_column(&self, table: &ResolvedTable, column: &str) -> Result<()> {
115        if self.column_exists(table, column) {
116            Ok(())
117        } else {
118            Err(SqlError::UnknownColumn {
119                table: table.name.clone(),
120                column: column.into(),
121            })
122        }
123    }
124
125    /// Get the single table in scope (for single-table queries).
126    pub fn single_table(&self) -> Option<&ResolvedTable> {
127        if self.tables.len() == 1 {
128            self.tables.values().next()
129        } else {
130            Option::None
131        }
132    }
133
134    /// Resolve tables from a FROM clause.
135    pub fn resolve_from(
136        catalog: &dyn SqlCatalog,
137        from: &[sqlparser::ast::TableWithJoins],
138    ) -> Result<Self> {
139        let mut scope = Self::new();
140        for table_with_joins in from {
141            scope.resolve_table_factor(catalog, &table_with_joins.relation)?;
142            for join in &table_with_joins.joins {
143                scope.resolve_table_factor(catalog, &join.relation)?;
144            }
145        }
146        Ok(scope)
147    }
148
149    fn resolve_table_factor(
150        &mut self,
151        catalog: &dyn SqlCatalog,
152        factor: &sqlparser::ast::TableFactor,
153    ) -> Result<()> {
154        if let Some((name, alias)) = table_name_from_factor(factor) {
155            let info = catalog
156                .get_collection(&name)?
157                .ok_or_else(|| SqlError::UnknownTable { name: name.clone() })?;
158            self.add(ResolvedTable { name, alias, info })?;
159        }
160        Ok(())
161    }
162}