Skip to main content

nodedb_sql/resolver/
columns.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Column and table resolution against the catalog.
4
5use std::collections::HashMap;
6
7use nodedb_types::DatabaseId;
8
9use crate::error::{Result, SqlError};
10use crate::parser::normalize::{
11    normalize_ident, normalize_object_name_checked, table_name_from_factor,
12};
13use crate::types::{
14    ArrayCatalogView, CollectionInfo, ColumnInfo, EngineType, SqlCatalog, SqlDataType,
15};
16use crate::types_array::{ArrayAttrType, ArrayDimType};
17
18/// Resolved table reference: name, alias, and catalog info.
19#[derive(Debug, Clone)]
20pub struct ResolvedTable {
21    pub name: String,
22    pub alias: Option<String>,
23    pub info: CollectionInfo,
24}
25
26impl ResolvedTable {
27    /// The name to use for qualified column references.
28    pub fn ref_name(&self) -> &str {
29        self.alias.as_deref().unwrap_or(&self.name)
30    }
31}
32
33/// Context built during FROM clause resolution.
34#[derive(Debug, Default)]
35pub struct TableScope {
36    /// Tables by reference name (alias or table name).
37    pub tables: HashMap<String, ResolvedTable>,
38    /// Insertion order for unambiguous column resolution.
39    order: Vec<String>,
40}
41
42impl TableScope {
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Add a resolved table. Returns error if name conflicts.
48    pub fn add(&mut self, table: ResolvedTable) -> Result<()> {
49        let key = table.ref_name().to_string();
50        if self.tables.contains_key(&key) {
51            return Err(SqlError::Parse {
52                detail: format!("duplicate table reference: {key}"),
53            });
54        }
55        self.order.push(key.clone());
56        self.tables.insert(key, table);
57        Ok(())
58    }
59
60    /// Resolve a column name, optionally qualified with a table reference.
61    ///
62    /// For schemaless collections, any column is accepted (dynamic fields).
63    /// For typed collections, the column must exist in the schema.
64    pub fn resolve_column(
65        &self,
66        table_ref: Option<&str>,
67        column: &str,
68    ) -> Result<(String, String)> {
69        let col = column.to_lowercase();
70
71        if let Some(tref) = table_ref {
72            let tref_lower = tref.to_lowercase();
73            let table = self
74                .tables
75                .get(&tref_lower)
76                .ok_or_else(|| SqlError::UnknownTable {
77                    name: tref_lower.clone(),
78                })?;
79            self.validate_column(table, &col)?;
80            return Ok((table.name.clone(), col));
81        }
82
83        // Unqualified: search all tables.
84        let mut matches = Vec::new();
85        for key in &self.order {
86            let table = &self.tables[key];
87            if self.column_exists(table, &col) {
88                matches.push(table.name.clone());
89            }
90        }
91
92        match matches.len() {
93            0 => {
94                // For single-table queries with schemaless, accept anything.
95                if self.tables.len() == 1 {
96                    let table = self
97                        .tables
98                        .values()
99                        .next()
100                        .expect("invariant: self.tables.len() == 1 checked immediately above");
101                    if table.info.engine == EngineType::DocumentSchemaless {
102                        return Ok((table.name.clone(), col));
103                    }
104                }
105                Err(SqlError::UnknownColumn {
106                    table: self
107                        .order
108                        .first()
109                        .cloned()
110                        .unwrap_or_else(|| "<unknown>".into()),
111                    column: col,
112                })
113            }
114            1 => Ok((
115                matches
116                    .into_iter()
117                    .next()
118                    .expect("invariant: matches.len() == 1 guaranteed by this match arm"),
119                col,
120            )),
121            _ => Err(SqlError::AmbiguousColumn { column: col }),
122        }
123    }
124
125    fn column_exists(&self, table: &ResolvedTable, column: &str) -> bool {
126        // Schemaless accepts any column.
127        if table.info.engine == EngineType::DocumentSchemaless {
128            return true;
129        }
130        table.info.columns.iter().any(|c| c.name == column)
131    }
132
133    fn validate_column(&self, table: &ResolvedTable, column: &str) -> Result<()> {
134        if self.column_exists(table, column) {
135            Ok(())
136        } else {
137            Err(SqlError::UnknownColumn {
138                table: table.name.clone(),
139                column: column.into(),
140            })
141        }
142    }
143
144    /// Get the single table in scope (for single-table queries).
145    pub fn single_table(&self) -> Option<&ResolvedTable> {
146        if self.tables.len() == 1 {
147            self.tables.values().next()
148        } else {
149            Option::None
150        }
151    }
152
153    /// Resolve tables from a FROM clause.
154    pub fn resolve_from(
155        catalog: &dyn SqlCatalog,
156        from: &[sqlparser::ast::TableWithJoins],
157    ) -> Result<Self> {
158        let mut scope = Self::new();
159        for table_with_joins in from {
160            scope.resolve_table_factor(catalog, &table_with_joins.relation)?;
161            for join in &table_with_joins.joins {
162                scope.resolve_table_factor(catalog, &join.relation)?;
163            }
164        }
165        Ok(scope)
166    }
167
168    fn resolve_table_factor(
169        &mut self,
170        catalog: &dyn SqlCatalog,
171        factor: &sqlparser::ast::TableFactor,
172    ) -> Result<()> {
173        // ARRAY_*(...) table-valued function: synthesize a ResolvedTable
174        // from the array's dim+attr schema so equi-join keys against the
175        // TVF's output rows resolve.
176        if let Some(resolved) = resolve_array_tvf(catalog, factor)? {
177            self.add(resolved)?;
178            return Ok(());
179        }
180        // LATERAL derived subquery: register the alias as a schemaless
181        // collection so qualified column references (`alias.col`) resolve
182        // without a catalog lookup. The actual inner plan is built separately.
183        if let sqlparser::ast::TableFactor::Derived {
184            lateral: true,
185            alias: Some(alias),
186            ..
187        } = factor
188        {
189            let alias_str = normalize_ident(&alias.name);
190            self.add(ResolvedTable {
191                name: alias_str.clone(),
192                alias: Some(alias_str.clone()),
193                info: CollectionInfo {
194                    name: alias_str,
195                    engine: EngineType::DocumentSchemaless,
196                    columns: Vec::new(),
197                    primary_key: None,
198                    has_auto_tier: false,
199                    indexes: Vec::new(),
200                    bitemporal: false,
201                    primary: nodedb_types::PrimaryEngine::Document,
202                    vector_primary: None,
203                },
204            })?;
205            return Ok(());
206        }
207        if let Some((name, alias)) = table_name_from_factor(factor)? {
208            let info = catalog
209                .get_collection(DatabaseId::DEFAULT, &name)?
210                .ok_or_else(|| SqlError::UnknownTable { name: name.clone() })?;
211            self.add(ResolvedTable { name, alias, info })?;
212        }
213        Ok(())
214    }
215}
216
217/// If `factor` is `ARRAY_*(name, ...)`, look up the array via the
218/// catalog and build a `ResolvedTable` whose columns mirror the array's
219/// dims + attrs. Returns `Ok(None)` for any non-array-TVF factor.
220fn resolve_array_tvf(
221    catalog: &dyn SqlCatalog,
222    factor: &sqlparser::ast::TableFactor,
223) -> Result<Option<ResolvedTable>> {
224    let (fn_name, args, alias) = match factor {
225        sqlparser::ast::TableFactor::Table {
226            name,
227            args: Some(args),
228            alias,
229            ..
230        } => (
231            normalize_object_name_checked(name)?,
232            args,
233            alias.as_ref().map(|a| normalize_ident(&a.name)),
234        ),
235        _ => return Ok(None),
236    };
237    if !matches!(
238        fn_name.as_str(),
239        "array_slice" | "array_project" | "array_agg" | "array_elementwise"
240    ) {
241        return Ok(None);
242    }
243
244    // First positional arg is the array name as a string literal.
245    let first = args.args.first().ok_or_else(|| SqlError::Unsupported {
246        detail: format!("{fn_name}: missing array-name argument"),
247    })?;
248    let array_name = extract_string_literal_arg(first).ok_or_else(|| SqlError::Unsupported {
249        detail: format!("{fn_name}: array-name argument must be a string literal"),
250    })?;
251    let view = catalog
252        .lookup_array(&array_name)
253        .ok_or_else(|| SqlError::UnknownTable {
254            name: array_name.clone(),
255        })?;
256
257    let info = CollectionInfo {
258        name: view.name.clone(),
259        engine: EngineType::Array,
260        columns: array_columns(&view),
261        primary_key: None,
262        has_auto_tier: false,
263        indexes: Vec::new(),
264        bitemporal: false,
265        primary: nodedb_types::PrimaryEngine::Document,
266        vector_primary: None,
267    };
268    Ok(Some(ResolvedTable {
269        name: view.name,
270        alias,
271        info,
272    }))
273}
274
275fn array_columns(view: &ArrayCatalogView) -> Vec<ColumnInfo> {
276    let mut cols = Vec::with_capacity(view.dims.len() + view.attrs.len());
277    for d in &view.dims {
278        cols.push(ColumnInfo {
279            name: d.name.clone(),
280            data_type: dim_type_to_sql(d.dtype),
281            nullable: false,
282            is_primary_key: false,
283            default: None,
284        });
285    }
286    for a in &view.attrs {
287        cols.push(ColumnInfo {
288            name: a.name.clone(),
289            data_type: attr_type_to_sql(a.dtype),
290            nullable: a.nullable,
291            is_primary_key: false,
292            default: None,
293        });
294    }
295    cols
296}
297
298fn dim_type_to_sql(t: ArrayDimType) -> SqlDataType {
299    match t {
300        ArrayDimType::Int64 => SqlDataType::Int64,
301        ArrayDimType::Float64 => SqlDataType::Float64,
302        ArrayDimType::TimestampMs => SqlDataType::Timestamp,
303        ArrayDimType::String => SqlDataType::String,
304    }
305}
306
307fn attr_type_to_sql(t: ArrayAttrType) -> SqlDataType {
308    match t {
309        ArrayAttrType::Int64 => SqlDataType::Int64,
310        ArrayAttrType::Float64 => SqlDataType::Float64,
311        ArrayAttrType::String => SqlDataType::String,
312        ArrayAttrType::Bytes => SqlDataType::Bytes,
313    }
314}
315
316fn extract_string_literal_arg(arg: &sqlparser::ast::FunctionArg) -> Option<String> {
317    use sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, Value};
318    let expr = match arg {
319        FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
320        FunctionArg::Named {
321            arg: FunctionArgExpr::Expr(e),
322            ..
323        } => e,
324        _ => return None,
325    };
326    match expr {
327        Expr::Value(v) => match &v.value {
328            Value::SingleQuotedString(s) => Some(s.clone()),
329            _ => None,
330        },
331        _ => None,
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use crate::types::{CollectionInfo, ColumnInfo, EngineType, SqlDataType};
339    use nodedb_types::PrimaryEngine;
340
341    fn strict_collection(name: &str, columns: Vec<&str>) -> CollectionInfo {
342        CollectionInfo {
343            name: name.into(),
344            engine: EngineType::DocumentStrict,
345            columns: columns
346                .into_iter()
347                .map(|c| ColumnInfo {
348                    name: c.into(),
349                    data_type: SqlDataType::String,
350                    nullable: true,
351                    is_primary_key: false,
352                    default: None,
353                })
354                .collect(),
355            primary_key: None,
356            has_auto_tier: false,
357            indexes: Vec::new(),
358            bitemporal: false,
359            primary: PrimaryEngine::Document,
360            vector_primary: None,
361        }
362    }
363
364    fn schemaless_collection(name: &str) -> CollectionInfo {
365        CollectionInfo {
366            name: name.into(),
367            engine: EngineType::DocumentSchemaless,
368            columns: Vec::new(),
369            primary_key: None,
370            has_auto_tier: false,
371            indexes: Vec::new(),
372            bitemporal: false,
373            primary: PrimaryEngine::Document,
374            vector_primary: None,
375        }
376    }
377
378    fn scope_with(info: CollectionInfo) -> TableScope {
379        let mut scope = TableScope::new();
380        scope
381            .add(ResolvedTable {
382                name: info.name.clone(),
383                alias: None,
384                info,
385            })
386            .expect("add failed");
387        scope
388    }
389
390    /// A double-quoted identifier resolves as a column name (case-preserved).
391    /// `"userId"` is parsed by the SQL layer as `Expr::Identifier` with
392    /// `quote_style = Some('"')` and `value = "userId"`.  At the
393    /// `TableScope` level the column name arrives lowercase (strict schema
394    /// columns are stored lowercase), so the resolved name is `"userid"`.
395    ///
396    /// This test confirms the resolution path, not just `convert_expr`.
397    #[test]
398    fn quoted_identifier_resolves_as_column() {
399        let scope = scope_with(strict_collection("users", vec!["userid", "email"]));
400        let (table, col) = scope
401            .resolve_column(None, "userid")
402            .expect("should resolve");
403        assert_eq!(table, "users");
404        assert_eq!(col, "userid");
405    }
406
407    /// An unrecognized column in a strict collection must yield
408    /// `SqlError::UnknownColumn`, NOT `SqlError::Unsupported`.
409    /// This verifies that a double-quoted identifier like `"ghost_col"`
410    /// that maps to `SqlExpr::Column { name: "ghost_col" }` surfaces the
411    /// right error variant when resolved against a strict schema.
412    #[test]
413    fn unknown_column_in_strict_collection_yields_unknown_column_error() {
414        let scope = scope_with(strict_collection("users", vec!["id", "email"]));
415        let err = scope
416            .resolve_column(None, "ghost_col")
417            .expect_err("should fail for unknown column");
418        assert!(
419            matches!(err, SqlError::UnknownColumn { ref column, .. } if column == "ghost_col"),
420            "expected UnknownColumn(ghost_col), got {err:?}"
421        );
422        // Must NOT be Unsupported — that would be the wrong error variant.
423        assert!(
424            !matches!(err, SqlError::Unsupported { .. }),
425            "must not surface Unsupported for a missing column"
426        );
427    }
428
429    /// Schemaless collections accept any column, including ones that look
430    /// like they could be misidentified double-quoted identifiers.
431    #[test]
432    fn any_column_accepted_in_schemaless_collection() {
433        let scope = scope_with(schemaless_collection("events"));
434        let (table, col) = scope
435            .resolve_column(None, "ghost_col")
436            .expect("schemaless should accept any column");
437        assert_eq!(table, "events");
438        assert_eq!(col, "ghost_col");
439    }
440
441    /// Qualified column reference: `"t"."col"` → table `t`, column `col`.
442    #[test]
443    fn qualified_column_resolves_correctly() {
444        let scope = scope_with(strict_collection("t", vec!["col", "other"]));
445        let (table, col) = scope
446            .resolve_column(Some("t"), "col")
447            .expect("qualified column should resolve");
448        assert_eq!(table, "t");
449        assert_eq!(col, "col");
450    }
451
452    /// Qualified reference to an unknown column in a strict collection must
453    /// yield `SqlError::UnknownColumn`, not `Unsupported`.
454    #[test]
455    fn qualified_unknown_column_in_strict_collection() {
456        let scope = scope_with(strict_collection("t", vec!["id"]));
457        let err = scope
458            .resolve_column(Some("t"), "missing")
459            .expect_err("should fail");
460        assert!(
461            matches!(err, SqlError::UnknownColumn { .. }),
462            "expected UnknownColumn, got {err:?}"
463        );
464    }
465}