Skip to main content

nodedb_sql/resolver/
columns.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Column and table resolution against the catalog.
4
5use std::collections::HashMap;
6
7use nodedb_types::DatabaseId;
8
9use crate::error::{Result, SqlError};
10use crate::parser::normalize::{
11    normalize_ident, normalize_object_name_checked, table_name_from_factor,
12};
13use crate::types::{
14    ArrayCatalogView, CollectionInfo, ColumnInfo, EngineType, SqlCatalog, SqlDataType,
15};
16use crate::types_array::{ArrayAttrType, ArrayDimType};
17
18/// Resolved table reference: name, alias, and catalog info.
19#[derive(Debug, Clone)]
20pub struct ResolvedTable {
21    pub name: String,
22    pub alias: Option<String>,
23    pub info: CollectionInfo,
24}
25
26impl ResolvedTable {
27    /// The name to use for qualified column references.
28    pub fn ref_name(&self) -> &str {
29        self.alias.as_deref().unwrap_or(&self.name)
30    }
31}
32
33/// Context built during FROM clause resolution.
34#[derive(Debug, Default)]
35pub struct TableScope {
36    /// Tables by reference name (alias or table name).
37    pub tables: HashMap<String, ResolvedTable>,
38    /// Insertion order for unambiguous column resolution.
39    order: Vec<String>,
40}
41
42impl TableScope {
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Add a resolved table. Returns error if name conflicts.
48    pub fn add(&mut self, table: ResolvedTable) -> Result<()> {
49        let key = table.ref_name().to_string();
50        if self.tables.contains_key(&key) {
51            return Err(SqlError::Parse {
52                detail: format!("duplicate table reference: {key}"),
53            });
54        }
55        self.order.push(key.clone());
56        self.tables.insert(key, table);
57        Ok(())
58    }
59
60    /// Resolve a column name, optionally qualified with a table reference.
61    ///
62    /// For schemaless collections, any column is accepted (dynamic fields).
63    /// For typed collections, the column must exist in the schema.
64    pub fn resolve_column(
65        &self,
66        table_ref: Option<&str>,
67        column: &str,
68    ) -> Result<(String, String)> {
69        let col = column.to_lowercase();
70
71        if let Some(tref) = table_ref {
72            let tref_lower = tref.to_lowercase();
73            let table = self
74                .tables
75                .get(&tref_lower)
76                .ok_or_else(|| SqlError::UnknownTable {
77                    name: tref_lower.clone(),
78                })?;
79            self.validate_column(table, &col)?;
80            return Ok((table.name.clone(), col));
81        }
82
83        // Unqualified: search all tables.
84        let mut matches = Vec::new();
85        for key in &self.order {
86            let table = &self.tables[key];
87            if self.column_exists(table, &col) {
88                matches.push(table.name.clone());
89            }
90        }
91
92        match matches.len() {
93            0 => {
94                // For single-table queries with schemaless, accept anything.
95                if self.tables.len() == 1 {
96                    let table = self
97                        .tables
98                        .values()
99                        .next()
100                        .expect("invariant: self.tables.len() == 1 checked immediately above");
101                    if table.info.engine == EngineType::DocumentSchemaless {
102                        return Ok((table.name.clone(), col));
103                    }
104                }
105                Err(SqlError::UnknownColumn {
106                    table: self
107                        .order
108                        .first()
109                        .cloned()
110                        .unwrap_or_else(|| "<unknown>".into()),
111                    column: col,
112                })
113            }
114            1 => Ok((
115                matches
116                    .into_iter()
117                    .next()
118                    .expect("invariant: matches.len() == 1 guaranteed by this match arm"),
119                col,
120            )),
121            _ => Err(SqlError::AmbiguousColumn { column: col }),
122        }
123    }
124
125    fn column_exists(&self, table: &ResolvedTable, column: &str) -> bool {
126        // Schemaless accepts any column.
127        if table.info.engine == EngineType::DocumentSchemaless {
128            return true;
129        }
130        table.info.columns.iter().any(|c| c.name == column)
131    }
132
133    fn validate_column(&self, table: &ResolvedTable, column: &str) -> Result<()> {
134        if self.column_exists(table, column) {
135            Ok(())
136        } else {
137            Err(SqlError::UnknownColumn {
138                table: table.name.clone(),
139                column: column.into(),
140            })
141        }
142    }
143
144    /// Get the single table in scope (for single-table queries).
145    pub fn single_table(&self) -> Option<&ResolvedTable> {
146        if self.tables.len() == 1 {
147            self.tables.values().next()
148        } else {
149            Option::None
150        }
151    }
152
153    /// Resolve tables from a FROM clause.
154    pub fn resolve_from(
155        catalog: &dyn SqlCatalog,
156        from: &[sqlparser::ast::TableWithJoins],
157    ) -> Result<Self> {
158        let mut scope = Self::new();
159        for table_with_joins in from {
160            scope.resolve_table_factor(catalog, &table_with_joins.relation)?;
161            for join in &table_with_joins.joins {
162                scope.resolve_table_factor(catalog, &join.relation)?;
163            }
164        }
165        Ok(scope)
166    }
167
168    fn resolve_table_factor(
169        &mut self,
170        catalog: &dyn SqlCatalog,
171        factor: &sqlparser::ast::TableFactor,
172    ) -> Result<()> {
173        // ARRAY_*(...) table-valued function: synthesize a ResolvedTable
174        // from the array's dim+attr schema so equi-join keys against the
175        // TVF's output rows resolve.
176        if let Some(resolved) = resolve_array_tvf(catalog, factor)? {
177            self.add(resolved)?;
178            return Ok(());
179        }
180        // LATERAL derived subquery: register the alias as a schemaless
181        // collection so qualified column references (`alias.col`) resolve
182        // without a catalog lookup. The actual inner plan is built separately.
183        if let sqlparser::ast::TableFactor::Derived {
184            lateral: true,
185            alias: Some(alias),
186            ..
187        } = factor
188        {
189            let alias_str = normalize_ident(&alias.name);
190            self.add(ResolvedTable {
191                name: alias_str.clone(),
192                alias: Some(alias_str.clone()),
193                info: CollectionInfo {
194                    name: alias_str,
195                    engine: EngineType::DocumentSchemaless,
196                    columns: Vec::new(),
197                    primary_key: None,
198                    has_auto_tier: false,
199                    indexes: Vec::new(),
200                    bitemporal: false,
201                    primary: nodedb_types::PrimaryEngine::Document,
202                    vector_primary: None,
203                },
204            })?;
205            return Ok(());
206        }
207        if let Some((name, alias)) = table_name_from_factor(factor)? {
208            let info = catalog
209                .get_collection(DatabaseId::DEFAULT, &name)?
210                .ok_or_else(|| SqlError::UnknownTable { name: name.clone() })?;
211            self.add(ResolvedTable { name, alias, info })?;
212        }
213        Ok(())
214    }
215}
216
217/// If `factor` is `ARRAY_*(name, ...)`, look up the array via the
218/// catalog and build a `ResolvedTable` whose columns mirror the array's
219/// dims + attrs. Returns `Ok(None)` for any non-array-TVF factor.
220fn resolve_array_tvf(
221    catalog: &dyn SqlCatalog,
222    factor: &sqlparser::ast::TableFactor,
223) -> Result<Option<ResolvedTable>> {
224    let (fn_name, args, alias) = match factor {
225        sqlparser::ast::TableFactor::Table {
226            name,
227            args: Some(args),
228            alias,
229            ..
230        } => (
231            normalize_object_name_checked(name)?,
232            args,
233            alias.as_ref().map(|a| normalize_ident(&a.name)),
234        ),
235        _ => return Ok(None),
236    };
237    if !matches!(
238        fn_name.as_str(),
239        "array_slice" | "array_project" | "array_agg" | "array_elementwise"
240    ) {
241        return Ok(None);
242    }
243
244    // First positional arg is the array name as a string literal.
245    let first = args.args.first().ok_or_else(|| SqlError::Unsupported {
246        detail: format!("{fn_name}: missing array-name argument"),
247    })?;
248    let array_name = extract_string_literal_arg(first).ok_or_else(|| SqlError::Unsupported {
249        detail: format!("{fn_name}: array-name argument must be a string literal"),
250    })?;
251    let view = catalog
252        .lookup_array(&array_name)
253        .ok_or_else(|| SqlError::UnknownTable {
254            name: array_name.clone(),
255        })?;
256
257    let info = CollectionInfo {
258        name: view.name.clone(),
259        engine: EngineType::Array,
260        columns: array_columns(&view),
261        primary_key: None,
262        has_auto_tier: false,
263        indexes: Vec::new(),
264        bitemporal: false,
265        primary: nodedb_types::PrimaryEngine::Document,
266        vector_primary: None,
267    };
268    Ok(Some(ResolvedTable {
269        name: view.name,
270        alias,
271        info,
272    }))
273}
274
275fn array_columns(view: &ArrayCatalogView) -> Vec<ColumnInfo> {
276    let mut cols = Vec::with_capacity(view.dims.len() + view.attrs.len());
277    for d in &view.dims {
278        cols.push(ColumnInfo {
279            name: d.name.clone(),
280            data_type: dim_type_to_sql(d.dtype),
281            nullable: false,
282            is_primary_key: false,
283            default: None,
284            raw_type: None,
285        });
286    }
287    for a in &view.attrs {
288        cols.push(ColumnInfo {
289            name: a.name.clone(),
290            data_type: attr_type_to_sql(a.dtype),
291            nullable: a.nullable,
292            is_primary_key: false,
293            default: None,
294            raw_type: None,
295        });
296    }
297    cols
298}
299
300fn dim_type_to_sql(t: ArrayDimType) -> SqlDataType {
301    match t {
302        ArrayDimType::Int64 => SqlDataType::Int64,
303        ArrayDimType::Float64 => SqlDataType::Float64,
304        ArrayDimType::TimestampMs => SqlDataType::Timestamp,
305        ArrayDimType::String => SqlDataType::String,
306    }
307}
308
309fn attr_type_to_sql(t: ArrayAttrType) -> SqlDataType {
310    match t {
311        ArrayAttrType::Int64 => SqlDataType::Int64,
312        ArrayAttrType::Float64 => SqlDataType::Float64,
313        ArrayAttrType::String => SqlDataType::String,
314        ArrayAttrType::Bytes => SqlDataType::Bytes,
315    }
316}
317
318fn extract_string_literal_arg(arg: &sqlparser::ast::FunctionArg) -> Option<String> {
319    use sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, Value};
320    let expr = match arg {
321        FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
322        FunctionArg::Named {
323            arg: FunctionArgExpr::Expr(e),
324            ..
325        } => e,
326        _ => return None,
327    };
328    match expr {
329        Expr::Value(v) => match &v.value {
330            Value::SingleQuotedString(s) => Some(s.clone()),
331            _ => None,
332        },
333        _ => None,
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use crate::types::{CollectionInfo, ColumnInfo, EngineType, SqlDataType};
341    use nodedb_types::PrimaryEngine;
342
343    fn strict_collection(name: &str, columns: Vec<&str>) -> CollectionInfo {
344        CollectionInfo {
345            name: name.into(),
346            engine: EngineType::DocumentStrict,
347            columns: columns
348                .into_iter()
349                .map(|c| ColumnInfo {
350                    name: c.into(),
351                    data_type: SqlDataType::String,
352                    nullable: true,
353                    is_primary_key: false,
354                    default: None,
355                    raw_type: None,
356                })
357                .collect(),
358            primary_key: None,
359            has_auto_tier: false,
360            indexes: Vec::new(),
361            bitemporal: false,
362            primary: PrimaryEngine::Document,
363            vector_primary: None,
364        }
365    }
366
367    fn schemaless_collection(name: &str) -> CollectionInfo {
368        CollectionInfo {
369            name: name.into(),
370            engine: EngineType::DocumentSchemaless,
371            columns: Vec::new(),
372            primary_key: None,
373            has_auto_tier: false,
374            indexes: Vec::new(),
375            bitemporal: false,
376            primary: PrimaryEngine::Document,
377            vector_primary: None,
378        }
379    }
380
381    fn scope_with(info: CollectionInfo) -> TableScope {
382        let mut scope = TableScope::new();
383        scope
384            .add(ResolvedTable {
385                name: info.name.clone(),
386                alias: None,
387                info,
388            })
389            .expect("add failed");
390        scope
391    }
392
393    /// A double-quoted identifier resolves as a column name (case-preserved).
394    /// `"userId"` is parsed by the SQL layer as `Expr::Identifier` with
395    /// `quote_style = Some('"')` and `value = "userId"`.  At the
396    /// `TableScope` level the column name arrives lowercase (strict schema
397    /// columns are stored lowercase), so the resolved name is `"userid"`.
398    ///
399    /// This test confirms the resolution path, not just `convert_expr`.
400    #[test]
401    fn quoted_identifier_resolves_as_column() {
402        let scope = scope_with(strict_collection("users", vec!["userid", "email"]));
403        let (table, col) = scope
404            .resolve_column(None, "userid")
405            .expect("should resolve");
406        assert_eq!(table, "users");
407        assert_eq!(col, "userid");
408    }
409
410    /// An unrecognized column in a strict collection must yield
411    /// `SqlError::UnknownColumn`, NOT `SqlError::Unsupported`.
412    /// This verifies that a double-quoted identifier like `"ghost_col"`
413    /// that maps to `SqlExpr::Column { name: "ghost_col" }` surfaces the
414    /// right error variant when resolved against a strict schema.
415    #[test]
416    fn unknown_column_in_strict_collection_yields_unknown_column_error() {
417        let scope = scope_with(strict_collection("users", vec!["id", "email"]));
418        let err = scope
419            .resolve_column(None, "ghost_col")
420            .expect_err("should fail for unknown column");
421        assert!(
422            matches!(err, SqlError::UnknownColumn { ref column, .. } if column == "ghost_col"),
423            "expected UnknownColumn(ghost_col), got {err:?}"
424        );
425        // Must NOT be Unsupported — that would be the wrong error variant.
426        assert!(
427            !matches!(err, SqlError::Unsupported { .. }),
428            "must not surface Unsupported for a missing column"
429        );
430    }
431
432    /// Schemaless collections accept any column, including ones that look
433    /// like they could be misidentified double-quoted identifiers.
434    #[test]
435    fn any_column_accepted_in_schemaless_collection() {
436        let scope = scope_with(schemaless_collection("events"));
437        let (table, col) = scope
438            .resolve_column(None, "ghost_col")
439            .expect("schemaless should accept any column");
440        assert_eq!(table, "events");
441        assert_eq!(col, "ghost_col");
442    }
443
444    /// Qualified column reference: `"t"."col"` → table `t`, column `col`.
445    #[test]
446    fn qualified_column_resolves_correctly() {
447        let scope = scope_with(strict_collection("t", vec!["col", "other"]));
448        let (table, col) = scope
449            .resolve_column(Some("t"), "col")
450            .expect("qualified column should resolve");
451        assert_eq!(table, "t");
452        assert_eq!(col, "col");
453    }
454
455    /// Qualified reference to an unknown column in a strict collection must
456    /// yield `SqlError::UnknownColumn`, not `Unsupported`.
457    #[test]
458    fn qualified_unknown_column_in_strict_collection() {
459        let scope = scope_with(strict_collection("t", vec!["id"]));
460        let err = scope
461            .resolve_column(Some("t"), "missing")
462            .expect_err("should fail");
463        assert!(
464            matches!(err, SqlError::UnknownColumn { .. }),
465            "expected UnknownColumn, got {err:?}"
466        );
467    }
468}