vibesql_executor/
schema.rs

1use std::{
2    borrow::Borrow,
3    collections::{HashMap, HashSet},
4    fmt,
5    hash::{Hash, Hasher},
6    ops::Deref,
7};
8
9use vibesql_catalog::TableIdentifier;
10
11/// A normalized table/alias key for case-insensitive lookups.
12/// Always stored as lowercase, making case-insensitive handling impossible to get wrong.
13#[derive(Debug, Clone, Eq)]
14pub struct TableKey(String);
15
16impl TableKey {
17    /// Create a new TableKey, normalizing to lowercase.
18    #[inline]
19    pub fn new(name: impl AsRef<str>) -> Self {
20        TableKey(name.as_ref().to_lowercase())
21    }
22
23    /// Get the normalized key as a string slice.
24    #[inline]
25    pub fn as_str(&self) -> &str {
26        &self.0
27    }
28
29    /// Consume the TableKey and return the inner String.
30    #[inline]
31    pub fn into_inner(self) -> String {
32        self.0
33    }
34}
35
36impl PartialEq for TableKey {
37    fn eq(&self, other: &Self) -> bool {
38        self.0 == other.0
39    }
40}
41
42impl Hash for TableKey {
43    fn hash<H: Hasher>(&self, state: &mut H) {
44        self.0.hash(state);
45    }
46}
47
48impl Deref for TableKey {
49    type Target = str;
50
51    fn deref(&self) -> &Self::Target {
52        &self.0
53    }
54}
55
56impl AsRef<str> for TableKey {
57    fn as_ref(&self) -> &str {
58        &self.0
59    }
60}
61
62impl Borrow<str> for TableKey {
63    fn borrow(&self) -> &str {
64        &self.0
65    }
66}
67
68impl fmt::Display for TableKey {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        write!(f, "{}", self.0)
71    }
72}
73
74impl From<String> for TableKey {
75    fn from(s: String) -> Self {
76        TableKey::new(s)
77    }
78}
79
80impl From<&str> for TableKey {
81    fn from(s: &str) -> Self {
82        TableKey::new(s)
83    }
84}
85
86impl From<TableKey> for String {
87    fn from(key: TableKey) -> Self {
88        key.0
89    }
90}
91
92impl From<&TableKey> for TableKey {
93    fn from(key: &TableKey) -> Self {
94        key.clone()
95    }
96}
97
98impl From<&String> for TableKey {
99    fn from(s: &String) -> Self {
100        TableKey::new(s)
101    }
102}
103
104/// Represents the combined schema from multiple tables (for JOINs)
105#[derive(Debug, Clone)]
106pub struct CombinedSchema {
107    /// Map from table identifier to (start_index, TableSchema)
108    /// start_index is where this table's columns begin in the combined row
109    /// TableIdentifier provides both canonical form for lookups and display form for output
110    pub table_schemas: HashMap<TableIdentifier, (usize, vibesql_catalog::TableSchema)>,
111    /// Total number of columns across all tables
112    pub total_columns: usize,
113    /// Columns that are hidden from `SELECT *` expansion due to NATURAL JOIN deduplication.
114    /// These columns are still accessible via qualified references like `table.*`.
115    /// This allows `SELECT t1.*` to return all columns from t1, while `SELECT *`
116    /// correctly deduplicates columns for NATURAL JOIN.
117    pub hidden_columns: HashSet<usize>,
118    /// Reference to outer scope schema for nested subquery column resolution (issue #4493)
119    /// Forms a linked-list chain similar to SQLite's NameContext.pNext
120    /// Enables resolution of columns from multiple nesting levels
121    pub outer_schema: Option<Box<CombinedSchema>>,
122    /// Table aliases/names that appear more than once in the FROM clause (issue #4507)
123    /// Used to detect ambiguous qualified column references like "A.f1" when table "A" appears
124    /// twice Stores normalized (lowercase) table identifiers for case-insensitive matching
125    pub duplicate_aliases: HashSet<TableIdentifier>,
126    /// Column names that have been joined via NATURAL JOIN or USING clause (issue #4517)
127    /// These columns exist in multiple tables but should NOT be considered ambiguous
128    /// because they are logically the same column after the join.
129    /// Stored as lowercase for case-insensitive matching.
130    pub joined_columns: HashSet<String>,
131    /// For USING columns in RIGHT/FULL OUTER JOINs, maps the unqualified column name
132    /// (lowercase) to a list of column indices for N-way COALESCE resolution.
133    /// When an unqualified reference is made to a USING column, we apply COALESCE
134    /// semantics: return the first non-NULL value from the list of columns.
135    /// This supports chained joins like `t1 NATURAL FULL JOIN t2 NATURAL FULL JOIN t3`
136    /// where we need COALESCE(t1.id, t2.id, t3.id).
137    /// Issue #4783, #4903: USING column semantics differ from SQLite in OUTER JOINs
138    pub using_coalesce_indices: HashMap<String, Vec<usize>>,
139    /// Column replacement map for RIGHT/FULL OUTER NATURAL JOINs in SELECT * expansion.
140    /// Maps hidden_column_index -> replacement_column_index.
141    /// When expanding SELECT *, if a column is hidden but has a replacement, output
142    /// the replacement column's value instead of skipping. This maintains the column
143    /// ordering from the left table while using values from the right table.
144    /// Example: In `t5 NATURAL RIGHT JOIN t4`, t5.id is hidden but should be replaced
145    /// by t4.id to maintain the output order (id, y, x) instead of (y, id, x).
146    pub column_replacement_map: HashMap<usize, usize>,
147    /// Alias tables that are added for parenthesized join expressions (issue #4905).
148    /// These are virtual tables that point to the same columns as existing tables.
149    /// They exist for column resolution (e.g., `j1.id` in `FROM t1 JOIN (...) AS j1 ON j1.id = t1.id`).
150    /// Stores the table identifiers of alias tables.
151    pub alias_tables: HashSet<TableIdentifier>,
152    /// Tables that are shadowed by an alias table (issue #4786).
153    /// When a parenthesized join has an alias, the underlying tables are shadowed
154    /// and should be skipped in SELECT * expansion. Instead, the alias table's columns
155    /// should be used.
156    /// Maps: aliased table name -> tables shadowed by that alias
157    pub shadowed_tables: HashMap<TableIdentifier, HashSet<TableIdentifier>>,
158}
159
160impl CombinedSchema {
161    /// Create an empty combined schema with no tables
162    ///
163    /// Used for SELECT statements without a FROM clause when the expression
164    /// evaluation needs outer context for column resolution.
165    pub fn empty() -> Self {
166        CombinedSchema {
167            table_schemas: HashMap::new(),
168            total_columns: 0,
169            hidden_columns: HashSet::new(),
170            outer_schema: None,
171            duplicate_aliases: HashSet::new(),
172            joined_columns: HashSet::new(),
173            using_coalesce_indices: HashMap::new(),
174            column_replacement_map: HashMap::new(),
175            alias_tables: HashSet::new(),
176            shadowed_tables: HashMap::new(),
177        }
178    }
179
180    /// Create a new combined schema from a single table
181    ///
182    /// Note: Table name is automatically normalized via TableIdentifier for case-insensitive
183    /// lookups
184    pub fn from_table(table_name: String, schema: vibesql_catalog::TableSchema) -> Self {
185        let total_columns = schema.columns.len();
186        let mut table_schemas = HashMap::new();
187        let table_id = TableIdentifier::unquoted(&table_name);
188        table_schemas.insert(table_id, (0, schema));
189        CombinedSchema {
190            table_schemas,
191            total_columns,
192            hidden_columns: HashSet::new(),
193            outer_schema: None,
194            duplicate_aliases: HashSet::new(),
195            joined_columns: HashSet::new(),
196            using_coalesce_indices: HashMap::new(),
197            column_replacement_map: HashMap::new(),
198            alias_tables: HashSet::new(),
199            shadowed_tables: HashMap::new(),
200        }
201    }
202
203    /// Create a new combined schema from a derived table (subquery result)
204    ///
205    /// Note: Alias is automatically normalized via TableIdentifier for case-insensitive lookups
206    pub fn from_derived_table(
207        alias: String,
208        column_names: Vec<String>,
209        column_types: Vec<vibesql_types::DataType>,
210    ) -> Self {
211        let total_columns = column_names.len();
212
213        // Build column definitions
214        let columns: Vec<vibesql_catalog::ColumnSchema> = column_names
215            .into_iter()
216            .zip(column_types)
217            .map(|(name, data_type)| vibesql_catalog::ColumnSchema {
218                name,
219                data_type,
220                nullable: true,       // Derived table columns are always nullable
221                default_value: None,  // Derived table columns have no defaults
222                generated_expr: None, // Derived table columns are not generated
223                collation: None,      // Derived table columns don't inherit collation
224                is_exact_integer_type: false, // Derived columns don't preserve exact type
225            })
226            .collect();
227
228        let schema = vibesql_catalog::TableSchema::new(alias.clone(), columns);
229        let mut table_schemas = HashMap::new();
230        let table_id = TableIdentifier::unquoted(&alias);
231        table_schemas.insert(table_id, (0, schema));
232        CombinedSchema {
233            table_schemas,
234            total_columns,
235            hidden_columns: HashSet::new(),
236            outer_schema: None,
237            duplicate_aliases: HashSet::new(),
238            joined_columns: HashSet::new(),
239            using_coalesce_indices: HashMap::new(),
240            column_replacement_map: HashMap::new(),
241            alias_tables: HashSet::new(),
242            shadowed_tables: HashMap::new(),
243        }
244    }
245
246    /// Add an alias for a parenthesized join expression.
247    ///
248    /// This is used for expressions like `(t1 JOIN t2) AS j1` where `j1` becomes an
249    /// alias for the combined result. The alias is added as a virtual table containing
250    /// all visible columns, allowing references like `j1.column` to work.
251    ///
252    /// The alias table is marked as "alias-only" and will not be expanded in SELECT *.
253    ///
254    /// **Important**: The alias table uses start_idx=0 and stores the original column
255    /// indices in a mapping, so that `j1.column` resolves to the correct index in the
256    /// actual row data.
257    pub fn add_join_alias(mut self, alias: &str) -> Self {
258        // Build the alias table columns to match what SELECT * would output:
259        // 1. First: USING/NATURAL JOIN columns (from joined_columns)
260        // 2. Then: Other visible columns
261        //
262        // For USING columns, use the first index from using_coalesce_indices
263        // so that j1.id resolves to the correct column position.
264
265        // Collect USING column schemas (from joined_columns)
266        let mut joined_col_entries: Vec<(usize, vibesql_catalog::ColumnSchema)> = Vec::new();
267        for joined_col in &self.joined_columns {
268            // Find the first column with this name (it will be the leftmost in the join)
269            if let Some(indices) = self.using_coalesce_indices.get(joined_col) {
270                if let Some(&first_idx) = indices.first() {
271                    // Find the column schema for this index
272                    for (table_id, (start_idx, table_schema)) in &self.table_schemas {
273                        if self.alias_tables.contains(table_id) {
274                            continue;
275                        }
276                        for (col_idx, col) in table_schema.columns.iter().enumerate() {
277                            let absolute_idx = *start_idx + col_idx;
278                            if absolute_idx == first_idx {
279                                // Use the column name (lowercase matched the joined_col)
280                                // but the index should be the first coalesce index
281                                joined_col_entries.push((first_idx, col.clone()));
282                                break;
283                            }
284                        }
285                    }
286                }
287            }
288        }
289
290        // Collect all visible non-USING columns
291        let mut other_columns: Vec<(usize, vibesql_catalog::ColumnSchema)> = Vec::new();
292        for (table_id, (start_idx, table_schema)) in &self.table_schemas {
293            if self.alias_tables.contains(table_id) {
294                continue;
295            }
296            for (col_idx, col) in table_schema.columns.iter().enumerate() {
297                let absolute_idx = *start_idx + col_idx;
298                // Skip hidden columns
299                if self.hidden_columns.contains(&absolute_idx) {
300                    continue;
301                }
302                // Skip USING columns (already handled above)
303                let is_joined = self.joined_columns.contains(&col.name.to_lowercase());
304                if is_joined {
305                    continue;
306                }
307                other_columns.push((absolute_idx, col.clone()));
308            }
309        }
310
311        // Sort other columns by index
312        other_columns.sort_by_key(|(idx, _)| *idx);
313
314        // Combine: USING columns first (sorted by their index), then other columns
315        joined_col_entries.sort_by_key(|(idx, _)| *idx);
316        let mut all_columns = joined_col_entries;
317        all_columns.extend(other_columns);
318
319        let columns: Vec<vibesql_catalog::ColumnSchema> =
320            all_columns.iter().map(|(_, col)| col.clone()).collect();
321
322        let schema = vibesql_catalog::TableSchema::new(alias.to_string(), columns);
323        let table_id = TableIdentifier::unquoted(alias);
324        // Use start_idx = 0 so column resolution returns indices 0, 1, 2, ...
325        // The order matches the all_columns order, so j1.id maps to index 0 if id is first
326        self.table_schemas.insert(table_id.clone(), (0, schema));
327        self.alias_tables.insert(table_id.clone());
328
329        // Issue #4786: Mark all existing non-alias tables as shadowed by this alias.
330        // This ensures that in SELECT *, the alias table's columns are used instead of
331        // the individual base tables' columns. This is especially important for outer
332        // joins with ON clause, where the aliased join needs to appear as a single table.
333        let shadowed: HashSet<TableIdentifier> = self
334            .table_schemas
335            .keys()
336            .filter(|t| !self.alias_tables.contains(*t) && *t != &table_id)
337            .cloned()
338            .collect();
339        self.shadowed_tables.insert(table_id, shadowed);
340
341        self
342    }
343
344    /// Combine two schemas (for JOIN operations)
345    ///
346    /// Note: Right table name is automatically normalized via TableIdentifier for case-insensitive
347    /// lookups
348    pub fn combine(
349        left: CombinedSchema,
350        right_table_name: String,
351        right_schema: vibesql_catalog::TableSchema,
352    ) -> Self {
353        let mut table_schemas = left.table_schemas;
354        let mut duplicate_aliases = left.duplicate_aliases;
355        let left_total = left.total_columns;
356        let right_columns = right_schema.columns.len();
357        let right_id = TableIdentifier::unquoted(&right_table_name);
358
359        // Track duplicate table alias/name - but NOT for self-joins
360        // A self-join is when the same table (identical schema) is joined to itself.
361        // In SQLite, self-joins like `FROM t1 JOIN t1 USING(a,b)` allow unambiguous
362        // references to `t1.column` because it's the same underlying table.
363        // Only mark as duplicate if it's a different table with the same alias.
364        if let Some((_, existing_schema)) = table_schemas.get(&right_id) {
365            // Check if it's a true self-join (same table) or alias conflict (different tables)
366            // For self-joins, the schemas are identical (same table name, same columns)
367            if existing_schema != &right_schema {
368                // Different tables with same alias - this is ambiguous
369                duplicate_aliases.insert(right_id.clone());
370            }
371            // If same schema, it's a self-join - don't mark as duplicate
372        }
373
374        // Always insert/overwrite the table
375        table_schemas.insert(right_id, (left_total, right_schema));
376        CombinedSchema {
377            table_schemas,
378            total_columns: left_total + right_columns,
379            hidden_columns: left.hidden_columns,
380            outer_schema: left.outer_schema,
381            duplicate_aliases,
382            joined_columns: left.joined_columns,
383            using_coalesce_indices: left.using_coalesce_indices,
384            column_replacement_map: left.column_replacement_map,
385            alias_tables: left.alias_tables,
386            shadowed_tables: left.shadowed_tables,
387        }
388    }
389
390    /// Merge two CombinedSchemas (for JOIN operations with nested joins)
391    ///
392    /// Unlike `combine` which adds a single table, this method merges ALL tables
393    /// from the right schema into the left schema. This is essential for nested
394    /// joins like `t1 JOIN (t2 JOIN t3 USING(a)) USING(a)` where the right side
395    /// contains multiple tables that must all remain visible.
396    ///
397    /// The right schema's tables have their start indices adjusted to account
398    /// for the left schema's total column count.
399    pub fn merge(left: CombinedSchema, right: CombinedSchema) -> Self {
400        let mut table_schemas = left.table_schemas;
401        let mut duplicate_aliases = left.duplicate_aliases;
402        let left_total = left.total_columns;
403
404        // Add all tables from right schema with adjusted start indices
405        for (table_id, (start_index, schema)) in right.table_schemas {
406            let adjusted_start = left_total + start_index;
407
408            // Check if this table already exists in the left schema
409            if let Some((_, existing_schema)) = table_schemas.get(&table_id) {
410                // Only mark as duplicate if it's a different table with the same alias
411                if existing_schema != &schema {
412                    duplicate_aliases.insert(table_id.clone());
413                }
414
415                // For self-joins (same table appearing twice), we need to keep BOTH
416                // entries so that USING/NATURAL join conditions can distinguish between
417                // the left and right instances. Use a synthetic suffix to create a
418                // unique key for the right-side instance.
419                // The synthetic key format "__selfjoin_right_<original_name>_<start_idx>"
420                // ensures uniqueness even for multi-way self-joins.
421                let synthetic_key = TableIdentifier::unquoted(&format!(
422                    "__selfjoin_right_{}_{}",
423                    table_id.canonical(),
424                    adjusted_start
425                ));
426                table_schemas.insert(synthetic_key, (adjusted_start, schema));
427            } else {
428                // No conflict - insert normally
429                table_schemas.insert(table_id, (adjusted_start, schema));
430            }
431        }
432
433        // Merge hidden columns, adjusting right side indices
434        let mut hidden_columns = left.hidden_columns;
435        for idx in right.hidden_columns {
436            hidden_columns.insert(left_total + idx);
437        }
438
439        // Merge duplicate aliases from both sides
440        duplicate_aliases.extend(right.duplicate_aliases);
441
442        // Merge joined columns from both sides
443        let mut joined_columns = left.joined_columns;
444        joined_columns.extend(right.joined_columns);
445
446        // Merge using_coalesce_indices from both sides (adjusting right side indices)
447        // For N-way coalescing, we extend existing Vec entries rather than overwriting
448        let mut using_coalesce_indices = left.using_coalesce_indices;
449        for (col_name, indices) in right.using_coalesce_indices {
450            let adjusted_indices: Vec<usize> = indices.iter().map(|idx| left_total + idx).collect();
451            using_coalesce_indices
452                .entry(col_name)
453                .or_insert_with(Vec::new)
454                .extend(adjusted_indices);
455        }
456
457        // Merge column_replacement_map from both sides (adjusting right side indices)
458        let mut column_replacement_map = left.column_replacement_map;
459        for (hidden_idx, replacement_idx) in right.column_replacement_map {
460            column_replacement_map.insert(left_total + hidden_idx, left_total + replacement_idx);
461        }
462
463        // Merge alias_tables from both sides
464        let mut alias_tables = left.alias_tables;
465        alias_tables.extend(right.alias_tables);
466
467        // Merge shadowed_tables from both sides
468        let mut shadowed_tables = left.shadowed_tables;
469        shadowed_tables.extend(right.shadowed_tables);
470
471        CombinedSchema {
472            table_schemas,
473            total_columns: left_total + right.total_columns,
474            hidden_columns,
475            outer_schema: left.outer_schema,
476            duplicate_aliases,
477            joined_columns,
478            using_coalesce_indices,
479            column_replacement_map,
480            alias_tables,
481            shadowed_tables,
482        }
483    }
484
485    /// Look up a column by name (optionally qualified with table name)
486    /// Uses case-insensitive matching for table/alias and column names
487    ///
488    /// Searches the current schema level first, then follows the outer_schema
489    /// chain to search enclosing scopes (similar to SQLite's NameContext.pNext).
490    /// This enables correlated subqueries to reference columns from outer queries.
491    pub fn get_column_index(&self, table: Option<&str>, column: &str) -> Option<usize> {
492        // Try current level first
493        let current_result = if let Some(table_name) = table {
494            // Qualified column reference (table.column)
495            // TableIdentifier normalizes to lowercase, so lookup is case-insensitive
496            let table_id = TableIdentifier::unquoted(table_name);
497            if let Some((start_index, schema)) = self.table_schemas.get(&table_id) {
498                // Special handling for alias tables (issue #4905)
499                // For alias tables, we need to find the actual column index in the
500                // underlying schema, not use start_index + idx (which would be wrong
501                // since alias tables have start_index=0 but columns are non-contiguous).
502                if self.alias_tables.contains(&table_id) {
503                    // Check if it's a USING column - use coalesce index
504                    let col_lower = column.to_lowercase();
505                    if let Some(indices) = self.using_coalesce_indices.get(&col_lower) {
506                        return indices.first().copied();
507                    }
508                    // Otherwise, find this column in the underlying (non-alias) tables
509                    return self.get_column_index(None, column);
510                }
511                schema.get_column_index(column).map(|idx| start_index + idx)
512            } else {
513                None
514            }
515        } else {
516            // Unqualified column reference - search all tables
517            // IMPORTANT: For LEFT JOINs, we must resolve to the LEFTMOST table
518            // that has the column. Since HashMap iteration order is non-deterministic,
519            // we find ALL matches and pick the one with the lowest start_index.
520            //
521            // Issue #4781: For NATURAL/USING joins, prefer NON-HIDDEN columns.
522            // In RIGHT JOIN USING(a), the left-side `a` is hidden, so we should
523            // pick the right-side `a` (which is not hidden). This ensures
524            // COALESCE semantics where the USING column picks the non-NULL value.
525            let column_lower = column.to_lowercase();
526            let is_joined_column = self.joined_columns.contains(&column_lower);
527
528            let mut best_match: Option<usize> = None;
529            let mut best_match_is_hidden = false;
530
531            for (table_id, (start_index, schema)) in &self.table_schemas {
532                // Skip alias tables for unqualified column resolution (issue #4905)
533                // Alias tables are virtual tables that should only be accessed via
534                // qualified references like `j1.column`, not unqualified references.
535                if self.alias_tables.contains(table_id) {
536                    continue;
537                }
538                if let Some(idx) = schema.get_column_index(column) {
539                    let absolute_idx = start_index + idx;
540                    let is_hidden = self.hidden_columns.contains(&absolute_idx);
541
542                    // For joined columns, prefer non-hidden columns
543                    // For regular columns, prefer leftmost (lowest index) as before
544                    let should_update = match (best_match, is_joined_column) {
545                        (None, _) => true,
546                        // For joined columns: prefer non-hidden over hidden
547                        (Some(_), true) if best_match_is_hidden && !is_hidden => true,
548                        // For all columns: prefer lower index if same hidden status
549                        (Some(current_best), _)
550                            if absolute_idx < current_best
551                                && (!is_joined_column || is_hidden == best_match_is_hidden) =>
552                        {
553                            true
554                        }
555                        _ => false,
556                    };
557
558                    if should_update {
559                        best_match = Some(absolute_idx);
560                        best_match_is_hidden = is_hidden;
561                    }
562                }
563            }
564            best_match
565        };
566
567        // If found at current level, return it
568        if current_result.is_some() {
569            return current_result;
570        }
571
572        // Not found at current level - search outer scopes via chain
573        // This enables nested correlated subqueries to reference columns
574        // from multiple enclosing scopes (issue #4493)
575        if let Some(outer) = &self.outer_schema {
576            return outer.get_column_index(table, column);
577        }
578
579        // Not found anywhere in the chain
580        None
581    }
582
583    /// Get the type affinity for a column by name
584    ///
585    /// Returns the SQLite type affinity for the column, which determines how
586    /// type coercion is performed in comparisons.
587    pub fn get_column_affinity(
588        &self,
589        table: Option<&str>,
590        column: &str,
591    ) -> Option<vibesql_types::TypeAffinity> {
592        if let Some(table_name) = table {
593            // Qualified column reference (table.column)
594            let table_id = TableIdentifier::unquoted(table_name);
595            if let Some((_start_index, schema)) = self.table_schemas.get(&table_id) {
596                if let Some(col_idx) = schema.get_column_index(column) {
597                    return Some(schema.columns[col_idx].data_type.sqlite_affinity());
598                }
599            }
600        } else {
601            // Unqualified column reference - search all tables
602            for (table_id, (_start_index, schema)) in &self.table_schemas {
603                // Skip alias tables for unqualified column resolution
604                if self.alias_tables.contains(table_id) {
605                    continue;
606                }
607                if let Some(col_idx) = schema.get_column_index(column) {
608                    return Some(schema.columns[col_idx].data_type.sqlite_affinity());
609                }
610            }
611        }
612        None
613    }
614
615    /// Check if an unqualified column reference is ambiguous
616    /// (i.e., exists in multiple tables in the schema)
617    ///
618    /// Returns true if the column exists in more than one table,
619    /// UNLESS the column is a "joined column" from NATURAL JOIN or USING clause.
620    /// Joined columns are deduplicated and should be accessible without qualification.
621    ///
622    /// Only relevant for unqualified column references - qualified references
623    /// (with table prefix) are never ambiguous.
624    pub fn is_column_ambiguous(&self, column: &str) -> bool {
625        // Columns joined via NATURAL JOIN or USING clause are never ambiguous (issue #4517)
626        // They logically represent a single column even though they exist in multiple tables
627        let column_lower = column.to_lowercase();
628        if self.joined_columns.contains(&column_lower) {
629            return false;
630        }
631
632        let mut match_count = 0;
633        for (table_id, (_start_index, schema)) in &self.table_schemas {
634            // Skip alias tables - they're virtual tables for column resolution only
635            // and should not cause ambiguity for unqualified column references (issue #4905)
636            if self.alias_tables.contains(table_id) {
637                continue;
638            }
639            if schema.get_column_index(column).is_some() {
640                match_count += 1;
641                if match_count > 1 {
642                    return true;
643                }
644            }
645        }
646        false
647    }
648
649    /// Check if any table in this schema has a column with the given name.
650    ///
651    /// This is a faster alternative to building a HashSet of all column names
652    /// for cases where we just need to check if a column exists.
653    /// Used for WHERE clause alias resolution (SQLite compatibility).
654    ///
655    /// # Arguments
656    /// * `column` - The column name (case-insensitive)
657    ///
658    /// # Returns
659    /// `true` if any table in the schema has a column matching this name
660    #[inline]
661    pub fn has_column(&self, column: &str) -> bool {
662        // Case-insensitive search through all tables
663        for (_start_index, schema) in self.table_schemas.values() {
664            if schema.get_column_index(column).is_some() {
665                return true;
666            }
667        }
668        false
669    }
670
671    /// Validate that a qualified column reference is not ambiguous.
672    ///
673    /// This checks if the table identifier appears more than once in the FROM clause,
674    /// which would make qualified references like "A.f1" ambiguous (issue #4507).
675    ///
676    /// # Arguments
677    /// * `table` - The table name/alias from the qualified reference
678    /// * `column` - The column name (used for error message only)
679    ///
680    /// # Returns
681    /// * `Ok(())` if the reference is unambiguous
682    /// * `Err(ExecutorError::AmbiguousColumnName)` if the table appears multiple times
683    ///
684    /// # Example
685    /// ```sql
686    /// -- This should fail validation:
687    /// SELECT A.f1 FROM test1 A, test2 A;  -- "A" appears twice
688    /// ```
689    pub fn validate_qualified_reference(
690        &self,
691        table: &str,
692        column: &str,
693    ) -> Result<(), crate::errors::ExecutorError> {
694        let table_id = TableIdentifier::unquoted(table);
695        if self.duplicate_aliases.contains(&table_id) {
696            return Err(crate::errors::ExecutorError::AmbiguousColumnName {
697                column_name: format!("{}.{}", table, column),
698            });
699        }
700        Ok(())
701    }
702
703    /// Get a table schema by name (case-insensitive lookup)
704    pub fn get_table(&self, table_name: &str) -> Option<&(usize, vibesql_catalog::TableSchema)> {
705        self.table_schemas.get(&TableIdentifier::unquoted(table_name))
706    }
707
708    /// Check if a table exists (case-insensitive lookup)
709    pub fn contains_table(&self, table_name: &str) -> bool {
710        self.table_schemas.contains_key(&TableIdentifier::unquoted(table_name))
711    }
712
713    /// Get all table names as strings (using display form)
714    pub fn table_names(&self) -> Vec<String> {
715        self.table_schemas.keys().map(|table_id| table_id.display().to_string()).collect()
716    }
717
718    /// Insert or update a table in the schema
719    pub fn insert_table(
720        &mut self,
721        name: String,
722        start_index: usize,
723        schema: vibesql_catalog::TableSchema,
724    ) {
725        let table_id = TableIdentifier::unquoted(&name);
726        self.table_schemas.insert(table_id, (start_index, schema));
727    }
728
729    /// Get the original column name from the schema for a column reference.
730    ///
731    /// SQLite preserves the schema column name (not the query identifier case)
732    /// when returning column names in results. This method looks up the column
733    /// in the schema and returns the original name.
734    ///
735    /// # Arguments
736    /// * `table` - Optional table name for qualified references (e.g., "t1" in "t1.col")
737    /// * `column` - Column name to look up (case-insensitive)
738    ///
739    /// # Returns
740    /// The original column name from the schema, or the input column name if not found.
741    pub fn get_original_column_name(&self, table: Option<&str>, column: &str) -> String {
742        if let Some(table_name) = table {
743            // Qualified column reference (table.column)
744            let table_id = TableIdentifier::unquoted(table_name);
745            if let Some((_start_index, schema)) = self.table_schemas.get(&table_id) {
746                if let Some(idx) = schema.get_column_index(column) {
747                    return schema.columns[idx].name.clone();
748                }
749            }
750        } else {
751            // Unqualified column reference - search all tables
752            // Find the match with the lowest start_index (leftmost table)
753            let mut best_match: Option<(usize, String)> = None;
754            for (start_index, schema) in self.table_schemas.values() {
755                if let Some(idx) = schema.get_column_index(column) {
756                    let name = schema.columns[idx].name.clone();
757                    match &best_match {
758                        None => best_match = Some((*start_index, name)),
759                        Some((current_start, _)) if *start_index < *current_start => {
760                            best_match = Some((*start_index, name));
761                        }
762                        _ => {}
763                    }
764                }
765            }
766            if let Some((_, name)) = best_match {
767                return name;
768            }
769        }
770        // Fallback: return the input column name if not found in schema
771        column.to_string()
772    }
773
774    /// Get the fully qualified column name with original table name prefix.
775    ///
776    /// This follows SQLite's `full_column_names=ON` behavior where column names
777    /// in results are prefixed with the original table name from the schema.
778    ///
779    /// For example, if a table was created as `CREATE TABLE test1(f1 int)` and
780    /// queried with `SELECT a.f1 FROM test1 a`, this returns `test1.f1` (using
781    /// the original table name "test1", not the alias "a").
782    ///
783    /// # Arguments
784    /// * `table` - Optional table alias/name for qualified references
785    /// * `column` - Column name to look up (case-insensitive)
786    ///
787    /// # Returns
788    /// The fully qualified column name in `table.column` format, or just the
789    /// column name if the table is not found.
790    pub fn get_full_column_name(&self, table: Option<&str>, column: &str) -> String {
791        if let Some(table_name) = table {
792            // Qualified column reference (table.column)
793            let table_id = TableIdentifier::unquoted(table_name);
794            if let Some((_start_index, schema)) = self.table_schemas.get(&table_id) {
795                if let Some(idx) = schema.get_column_index(column) {
796                    // Use the original table name from the schema
797                    return format!("{}.{}", schema.name, schema.columns[idx].name);
798                }
799            }
800        } else {
801            // Unqualified column reference - search all tables
802            // Find the match with the lowest start_index (leftmost table)
803            let mut best_match: Option<(usize, String, String)> = None;
804            for (start_index, schema) in self.table_schemas.values() {
805                if let Some(idx) = schema.get_column_index(column) {
806                    let table_name = schema.name.clone();
807                    let col_name = schema.columns[idx].name.clone();
808                    match &best_match {
809                        None => best_match = Some((*start_index, table_name, col_name)),
810                        Some((current_start, _, _)) if *start_index < *current_start => {
811                            best_match = Some((*start_index, table_name, col_name));
812                        }
813                        _ => {}
814                    }
815                }
816            }
817            if let Some((_, table_name, col_name)) = best_match {
818                return format!("{}.{}", table_name, col_name);
819            }
820        }
821        // Fallback: return the input column name if not found in schema
822        column.to_string()
823    }
824
825    /// Check if a column index is hidden from `SELECT *` expansion.
826    ///
827    /// Columns are hidden when they are duplicates in a NATURAL JOIN.
828    /// For example, in `SELECT * FROM t1 NATURAL JOIN t2` where both tables
829    /// have column `a`, the `t2.a` column is hidden so `SELECT *` only shows
830    /// one copy of `a` (from t1).
831    ///
832    /// However, `SELECT t2.*` should still include `t2.a` because qualified
833    /// wildcards expand all columns from that specific table.
834    #[inline]
835    pub fn is_column_hidden(&self, idx: usize) -> bool {
836        self.hidden_columns.contains(&idx)
837    }
838
839    /// Mark a column as hidden from `SELECT *` expansion.
840    ///
841    /// This is used by NATURAL JOIN to hide duplicate columns from the right side.
842    pub fn hide_column(&mut self, idx: usize) {
843        self.hidden_columns.insert(idx);
844    }
845
846    /// Mark a column name as a "joined column" from NATURAL JOIN or USING clause.
847    ///
848    /// Joined columns exist in multiple tables but should NOT be considered ambiguous
849    /// because they are logically the same column after the join. This allows
850    /// unqualified references to these columns without triggering an ambiguity error.
851    ///
852    /// # Arguments
853    /// * `column` - The column name (will be normalized to lowercase)
854    pub fn add_joined_column(&mut self, column: &str) {
855        self.joined_columns.insert(column.to_lowercase());
856    }
857
858    /// Add a USING column coalesce pair for RIGHT/FULL OUTER JOINs.
859    ///
860    /// For USING columns in OUTER JOINs, unqualified references should use
861    /// COALESCE semantics. This method records a column index for later coalesce evaluation.
862    /// For chained joins, each call extends the existing Vec with new indices.
863    ///
864    /// # Arguments
865    /// * `column` - The column name (will be normalized to lowercase)
866    /// * `left_idx` - Index of the left-side column (first in the chain)
867    /// * `right_idx` - Index of the right-side column (added to the chain)
868    ///
869    /// Issue #4783, #4903: USING column semantics in OUTER JOINs with N-way coalescing
870    pub fn add_using_coalesce_pair(&mut self, column: &str, left_idx: usize, right_idx: usize) {
871        let indices = self
872            .using_coalesce_indices
873            .entry(column.to_lowercase())
874            .or_insert_with(Vec::new);
875
876        // Issue #4909: For chained NATURAL FULL JOINs like `t3 NATURAL FULL JOIN (inner)`,
877        // the Vec may already have entries from the inner join (e.g., [t4.id, t5.id]).
878        // We must INSERT left_idx at the BEGINNING if not present, to get [t3.id, t4.id, t5.id].
879        // This ensures COALESCE picks the leftmost non-NULL value.
880        if !indices.contains(&left_idx) {
881            indices.insert(0, left_idx);
882        }
883        // Always add right_idx at the end if not already present
884        if !indices.contains(&right_idx) {
885            indices.push(right_idx);
886        }
887    }
888
889    /// Get the coalesce pair for a USING column, if any.
890    /// For backwards compatibility, returns the first two indices as a pair.
891    ///
892    /// Returns Some((left_idx, right_idx)) if this column needs COALESCE
893    /// semantics for OUTER JOIN USING, None otherwise.
894    ///
895    /// # Arguments
896    /// * `column` - The column name (will be normalized to lowercase)
897    pub fn get_using_coalesce_pair(&self, column: &str) -> Option<(usize, usize)> {
898        self.using_coalesce_indices
899            .get(&column.to_lowercase())
900            .filter(|indices| indices.len() >= 2)
901            .map(|indices| (indices[0], indices[1]))
902    }
903
904    /// Get all coalesce indices for a USING column (for N-way COALESCE).
905    ///
906    /// Returns Some(&Vec<usize>) containing all column indices that should be
907    /// coalesced for this column name.
908    pub fn get_using_coalesce_indices(&self, column: &str) -> Option<&Vec<usize>> {
909        self.using_coalesce_indices.get(&column.to_lowercase())
910    }
911
912    /// Add a column replacement for SELECT * expansion (for RIGHT/FULL OUTER JOINs).
913    ///
914    /// When a hidden column has a replacement, SELECT * will output the replacement
915    /// column's value at the hidden column's position, maintaining correct column ordering.
916    pub fn add_column_replacement(&mut self, hidden_idx: usize, replacement_idx: usize) {
917        self.column_replacement_map.insert(hidden_idx, replacement_idx);
918    }
919
920    /// Get the replacement column index for a hidden column, if any.
921    pub fn get_column_replacement(&self, hidden_idx: usize) -> Option<usize> {
922        self.column_replacement_map.get(&hidden_idx).copied()
923    }
924
925    /// Get all indices for N-way COALESCE for a left-side USING column (for SELECT *).
926    ///
927    /// In FULL OUTER JOIN with USING clause, when expanding SELECT *, we need to apply
928    /// N-way COALESCE for USING columns. This method returns all indices except the first
929    /// (which is the "left" index) for coalescing.
930    ///
931    /// Returns Some(&[indices]) if the given index is a left-side USING column, None otherwise.
932    pub fn get_using_coalesce_rest_for_left(&self, left_idx: usize) -> Option<&[usize]> {
933        for indices in self.using_coalesce_indices.values() {
934            if !indices.is_empty() && indices[0] == left_idx && indices.len() > 1 {
935                return Some(&indices[1..]);
936            }
937        }
938        None
939    }
940
941    /// Get the right-side column index for a left-side USING column (for COALESCE in SELECT *).
942    /// For backwards compatibility - returns only the second index (first "right" index).
943    ///
944    /// Returns Some(right_idx) if the given index is a left-side USING column, None otherwise.
945    pub fn get_using_coalesce_right_for_left(&self, left_idx: usize) -> Option<usize> {
946        for indices in self.using_coalesce_indices.values() {
947            if !indices.is_empty() && indices[0] == left_idx && indices.len() > 1 {
948                return Some(indices[1]);
949            }
950        }
951        None
952    }
953
954    /// Check if the given column index is a right-side of a USING coalesce chain.
955    ///
956    /// These columns should be skipped in SELECT * output because they're
957    /// represented by the first column with COALESCE applied.
958    pub fn is_using_coalesce_right_side(&self, idx: usize) -> bool {
959        // Check if this index appears in any position other than the first
960        for indices in self.using_coalesce_indices.values() {
961            if indices.len() > 1 && indices[1..].contains(&idx) {
962                return true;
963            }
964        }
965        false
966    }
967
968    /// Get all coalesce indices for a column that's anywhere in the chain.
969    ///
970    /// Unlike `get_using_coalesce_rest_for_left` which only works if the given index
971    /// is the FIRST in the chain, this method returns all indices in the chain if
972    /// the given index is found ANYWHERE in the chain. This is needed for N-way
973    /// coalescing where the visible column might be in the middle of the chain.
974    ///
975    /// Returns Some(&Vec<usize>) if the given index is part of a coalesce chain.
976    pub fn get_all_coalesce_indices_for_column(&self, idx: usize) -> Option<&Vec<usize>> {
977        for indices in self.using_coalesce_indices.values() {
978            if indices.contains(&idx) && indices.len() > 1 {
979                return Some(indices);
980            }
981        }
982        None
983    }
984
985    /// Build a map from column names to their indices.
986    ///
987    /// This is used by window function frame calculations to resolve named column
988    /// references in ORDER BY expressions. The map contains both the original case
989    /// and lowercase versions of column names for case-insensitive matching.
990    ///
991    /// For columns that appear in multiple tables, the mapping prefers the leftmost
992    /// table (lowest start_index) to match the behavior of unqualified column lookups.
993    pub fn build_column_name_map(&self) -> std::collections::HashMap<String, usize> {
994        let mut map = std::collections::HashMap::new();
995
996        // Collect entries sorted by start_index to ensure deterministic ordering
997        let mut entries: Vec<_> = self.table_schemas.iter()
998            .filter(|(table_id, _)| !self.alias_tables.contains(*table_id))
999            .map(|(_, (start_index, schema))| (*start_index, schema))
1000            .collect();
1001        entries.sort_by_key(|(start_index, _)| *start_index);
1002
1003        for (start_index, schema) in entries {
1004            for (idx, col) in schema.columns.iter().enumerate() {
1005                let absolute_idx = start_index + idx;
1006                let name = &col.name;
1007
1008                // Insert original case if not already present
1009                if !map.contains_key(name) {
1010                    map.insert(name.clone(), absolute_idx);
1011                }
1012
1013                // Insert lowercase for case-insensitive matching
1014                let lower = name.to_lowercase();
1015                if !map.contains_key(&lower) {
1016                    map.insert(lower, absolute_idx);
1017                }
1018            }
1019        }
1020
1021        map
1022    }
1023}
1024
1025/// Builder for incrementally constructing a CombinedSchema
1026///
1027/// Builds schemas in O(n) time instead of O(n²) by tracking
1028/// the column offset as tables are added.
1029#[derive(Debug)]
1030pub struct SchemaBuilder {
1031    table_schemas: HashMap<TableIdentifier, (usize, vibesql_catalog::TableSchema)>,
1032    column_offset: usize,
1033    hidden_columns: HashSet<usize>,
1034    duplicate_aliases: HashSet<TableIdentifier>,
1035    joined_columns: HashSet<String>,
1036    using_coalesce_indices: HashMap<String, Vec<usize>>,
1037    column_replacement_map: HashMap<usize, usize>,
1038    alias_tables: HashSet<TableIdentifier>,
1039    shadowed_tables: HashMap<TableIdentifier, HashSet<TableIdentifier>>,
1040}
1041
1042impl SchemaBuilder {
1043    /// Create a new empty schema builder
1044    pub fn new() -> Self {
1045        SchemaBuilder {
1046            table_schemas: HashMap::new(),
1047            column_offset: 0,
1048            hidden_columns: HashSet::new(),
1049            duplicate_aliases: HashSet::new(),
1050            joined_columns: HashSet::new(),
1051            using_coalesce_indices: HashMap::new(),
1052            column_replacement_map: HashMap::new(),
1053            alias_tables: HashSet::new(),
1054            shadowed_tables: HashMap::new(),
1055        }
1056    }
1057
1058    /// Create a schema builder initialized with an existing CombinedSchema
1059    ///
1060    /// Note: Table names are already normalized via TableIdentifier
1061    pub fn from_schema(schema: CombinedSchema) -> Self {
1062        let column_offset = schema.total_columns;
1063        SchemaBuilder {
1064            table_schemas: schema.table_schemas,
1065            column_offset,
1066            hidden_columns: schema.hidden_columns,
1067            duplicate_aliases: schema.duplicate_aliases,
1068            joined_columns: schema.joined_columns,
1069            using_coalesce_indices: schema.using_coalesce_indices,
1070            column_replacement_map: schema.column_replacement_map,
1071            alias_tables: schema.alias_tables,
1072            shadowed_tables: schema.shadowed_tables,
1073        }
1074    }
1075
1076    /// Add a table to the schema
1077    ///
1078    /// This is an O(1) operation - columns are not copied, just indexed
1079    /// Note: Table names are automatically normalized via TableIdentifier for case-insensitive
1080    /// lookups
1081    pub fn add_table(&mut self, name: String, schema: vibesql_catalog::TableSchema) -> &mut Self {
1082        let num_columns = schema.columns.len();
1083        let table_id = TableIdentifier::unquoted(&name);
1084
1085        // Track duplicate table alias/name - but NOT for self-joins
1086        // (same logic as CombinedSchema::combine())
1087        if let Some((_, existing_schema)) = self.table_schemas.get(&table_id) {
1088            // Only mark as duplicate if it's a different table with the same alias
1089            if existing_schema != &schema {
1090                self.duplicate_aliases.insert(table_id.clone());
1091            }
1092        }
1093
1094        self.table_schemas.insert(table_id, (self.column_offset, schema));
1095        self.column_offset += num_columns;
1096        self
1097    }
1098
1099    /// Build the final CombinedSchema
1100    ///
1101    /// This consumes the builder and produces the schema in O(1) time
1102    pub fn build(self) -> CombinedSchema {
1103        CombinedSchema {
1104            table_schemas: self.table_schemas,
1105            total_columns: self.column_offset,
1106            hidden_columns: self.hidden_columns,
1107            outer_schema: None,
1108            duplicate_aliases: self.duplicate_aliases,
1109            joined_columns: self.joined_columns,
1110            using_coalesce_indices: self.using_coalesce_indices,
1111            column_replacement_map: self.column_replacement_map,
1112            alias_tables: self.alias_tables,
1113            shadowed_tables: self.shadowed_tables,
1114        }
1115    }
1116
1117    /// Add a column replacement for SELECT * expansion (for RIGHT/FULL OUTER JOINs)
1118    pub fn add_column_replacement(&mut self, hidden_idx: usize, replacement_idx: usize) {
1119        self.column_replacement_map.insert(hidden_idx, replacement_idx);
1120    }
1121}
1122
1123impl Default for SchemaBuilder {
1124    fn default() -> Self {
1125        Self::new()
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use vibesql_catalog::ColumnSchema;
1132    use vibesql_types::DataType;
1133
1134    use super::*;
1135
1136    /// Helper to create a simple table schema with the given columns
1137    fn table_schema_with_columns(
1138        table_name: &str,
1139        columns: Vec<(&str, DataType)>,
1140    ) -> vibesql_catalog::TableSchema {
1141        let cols: Vec<ColumnSchema> = columns
1142            .into_iter()
1143            .map(|(name, data_type)| ColumnSchema::new(name.to_string(), data_type, true))
1144            .collect();
1145        vibesql_catalog::TableSchema::new(table_name.to_string(), cols)
1146    }
1147
1148    /// Helper to create a table schema with a single column
1149    fn table_schema_with_column(
1150        table_name: &str,
1151        column_name: &str,
1152    ) -> vibesql_catalog::TableSchema {
1153        table_schema_with_columns(table_name, vec![(column_name, DataType::Integer)])
1154    }
1155
1156    // ==========================================================================
1157    // CombinedSchema::from_table - Case-Insensitive Table Name Tests
1158    // ==========================================================================
1159
1160    #[test]
1161    fn test_from_table_uppercase_insertion_case_insensitive_lookup() {
1162        // Insert with uppercase table name
1163        let schema = CombinedSchema::from_table(
1164            "ITEM".to_string(),
1165            table_schema_with_column("ITEM", "price"),
1166        );
1167
1168        // All case variations should find the column
1169        assert!(schema.get_column_index(Some("ITEM"), "price").is_some(), "ITEM should find price");
1170        assert!(schema.get_column_index(Some("item"), "price").is_some(), "item should find price");
1171        assert!(schema.get_column_index(Some("Item"), "price").is_some(), "Item should find price");
1172        assert!(schema.get_column_index(Some("iTEM"), "price").is_some(), "iTEM should find price");
1173    }
1174
1175    #[test]
1176    fn test_from_table_lowercase_insertion_case_insensitive_lookup() {
1177        // Insert with lowercase table name
1178        let schema = CombinedSchema::from_table(
1179            "item".to_string(),
1180            table_schema_with_column("item", "price"),
1181        );
1182
1183        // All case variations should find the column
1184        assert!(schema.get_column_index(Some("ITEM"), "price").is_some());
1185        assert!(schema.get_column_index(Some("item"), "price").is_some());
1186        assert!(schema.get_column_index(Some("Item"), "price").is_some());
1187    }
1188
1189    #[test]
1190    fn test_from_table_mixedcase_insertion_case_insensitive_lookup() {
1191        // Insert with mixed case table name
1192        let schema = CombinedSchema::from_table(
1193            "MyTable".to_string(),
1194            table_schema_with_column("MyTable", "id"),
1195        );
1196
1197        // All case variations should find the column
1198        assert!(schema.get_column_index(Some("MYTABLE"), "id").is_some());
1199        assert!(schema.get_column_index(Some("mytable"), "id").is_some());
1200        assert!(schema.get_column_index(Some("MyTable"), "id").is_some());
1201        assert!(schema.get_column_index(Some("myTable"), "id").is_some());
1202    }
1203
1204    // ==========================================================================
1205    // CombinedSchema::from_derived_table - Case-Insensitive Alias Tests
1206    // ==========================================================================
1207
1208    #[test]
1209    fn test_from_derived_table_case_insensitive_alias() {
1210        // Derived table with uppercase alias
1211        let schema = CombinedSchema::from_derived_table(
1212            "SUBQ".to_string(),
1213            vec!["col1".to_string(), "col2".to_string()],
1214            vec![DataType::Integer, DataType::Varchar { max_length: None }],
1215        );
1216
1217        // All alias case variations should work
1218        assert!(schema.get_column_index(Some("SUBQ"), "col1").is_some());
1219        assert!(schema.get_column_index(Some("subq"), "col1").is_some());
1220        assert!(schema.get_column_index(Some("Subq"), "col1").is_some());
1221    }
1222
1223    // ==========================================================================
1224    // CombinedSchema::combine - Multi-Table Case-Insensitive Tests
1225    // ==========================================================================
1226
1227    #[test]
1228    fn test_combine_case_insensitive_both_tables() {
1229        // Create left schema with uppercase
1230        let left = CombinedSchema::from_table(
1231            "ORDERS".to_string(),
1232            table_schema_with_columns(
1233                "ORDERS",
1234                vec![("order_id", DataType::Integer), ("customer_id", DataType::Integer)],
1235            ),
1236        );
1237
1238        // Combine with right table using different case
1239        let combined = CombinedSchema::combine(
1240            left,
1241            "Items".to_string(),
1242            table_schema_with_columns(
1243                "Items",
1244                vec![("item_id", DataType::Integer), ("price", DataType::DoublePrecision)],
1245            ),
1246        );
1247
1248        // Verify left table columns accessible with any case
1249        assert!(combined.get_column_index(Some("orders"), "order_id").is_some());
1250        assert!(combined.get_column_index(Some("ORDERS"), "order_id").is_some());
1251        assert!(combined.get_column_index(Some("Orders"), "customer_id").is_some());
1252
1253        // Verify right table columns accessible with any case
1254        assert!(combined.get_column_index(Some("items"), "item_id").is_some());
1255        assert!(combined.get_column_index(Some("ITEMS"), "item_id").is_some());
1256        assert!(combined.get_column_index(Some("Items"), "price").is_some());
1257
1258        // Verify correct indices (left table starts at 0, right at 2)
1259        assert_eq!(combined.get_column_index(Some("orders"), "order_id"), Some(0));
1260        assert_eq!(combined.get_column_index(Some("orders"), "customer_id"), Some(1));
1261        assert_eq!(combined.get_column_index(Some("items"), "item_id"), Some(2));
1262        assert_eq!(combined.get_column_index(Some("items"), "price"), Some(3));
1263    }
1264
1265    #[test]
1266    fn test_combine_multiple_joins_case_insensitive() {
1267        // Simulate a 3-way join: orders JOIN customers JOIN items
1268        let orders = CombinedSchema::from_table(
1269            "O".to_string(), // short alias
1270            table_schema_with_column("O", "order_id"),
1271        );
1272
1273        let with_customers = CombinedSchema::combine(
1274            orders,
1275            "C".to_string(),
1276            table_schema_with_column("C", "customer_id"),
1277        );
1278
1279        let with_items = CombinedSchema::combine(
1280            with_customers,
1281            "I".to_string(),
1282            table_schema_with_column("I", "item_id"),
1283        );
1284
1285        // All aliases should be case-insensitive
1286        assert!(with_items.get_column_index(Some("o"), "order_id").is_some());
1287        assert!(with_items.get_column_index(Some("O"), "order_id").is_some());
1288        assert!(with_items.get_column_index(Some("c"), "customer_id").is_some());
1289        assert!(with_items.get_column_index(Some("C"), "customer_id").is_some());
1290        assert!(with_items.get_column_index(Some("i"), "item_id").is_some());
1291        assert!(with_items.get_column_index(Some("I"), "item_id").is_some());
1292    }
1293
1294    // ==========================================================================
1295    // CombinedSchema::get_column_index - Unqualified Column Lookup
1296    // ==========================================================================
1297
1298    #[test]
1299    fn test_unqualified_column_lookup_no_ambiguity() {
1300        let schema = CombinedSchema::from_table(
1301            "USERS".to_string(),
1302            table_schema_with_columns(
1303                "USERS",
1304                vec![("id", DataType::Integer), ("name", DataType::Varchar { max_length: None })],
1305            ),
1306        );
1307
1308        // Unqualified lookup should work
1309        assert!(schema.get_column_index(None, "id").is_some());
1310        assert!(schema.get_column_index(None, "name").is_some());
1311        assert!(schema.get_column_index(None, "missing").is_none());
1312    }
1313
1314    #[test]
1315    fn test_column_case_sensitive_with_fallback() {
1316        // Column created with mixed case (simulating a delimited identifier like "UserName")
1317        let schema = CombinedSchema::from_table(
1318            "users".to_string(),
1319            table_schema_with_column("users", "UserName"),
1320        );
1321
1322        // Exact case match works
1323        assert!(schema.get_column_index(Some("users"), "UserName").is_some());
1324        // Case-insensitive fallback also works for backward compatibility
1325        assert!(schema.get_column_index(Some("users"), "username").is_some());
1326        assert!(schema.get_column_index(Some("users"), "USERNAME").is_some());
1327    }
1328
1329    /// Test case for issue #4111: TPC-DS Q6 scenario
1330    /// Schema created with lowercase column names (from data loader)
1331    /// Query uses uppercase identifiers (from parser normalization)
1332    #[test]
1333    fn test_tpcds_q6_case_insensitive_column_lookup_issue_4111() {
1334        // Simulate TPC-DS item table with lowercase columns (as created by data loader)
1335        let schema = CombinedSchema::from_table(
1336            "J".to_string(), // Uppercase alias from parser
1337            table_schema_with_columns(
1338                "item",
1339                vec![
1340                    ("i_item_sk", DataType::Integer),
1341                    ("i_current_price", DataType::DoublePrecision), // lowercase!
1342                    ("i_category", DataType::Varchar { max_length: None }),
1343                ],
1344            ),
1345        );
1346
1347        // Query uses uppercase column names (from parser normalization)
1348        // This is the exact pattern that fails in TPC-DS Q6:
1349        // SELECT AVG(j.i_current_price) FROM item j WHERE j.i_category = i.i_category
1350        assert!(
1351            schema.get_column_index(Some("J"), "I_CURRENT_PRICE").is_some(),
1352            "J.I_CURRENT_PRICE should find i_current_price via case-insensitive lookup"
1353        );
1354        assert!(
1355            schema.get_column_index(Some("J"), "I_CATEGORY").is_some(),
1356            "J.I_CATEGORY should find i_category via case-insensitive lookup"
1357        );
1358        assert!(
1359            schema.get_column_index(Some("j"), "I_CURRENT_PRICE").is_some(),
1360            "j.I_CURRENT_PRICE should find i_current_price"
1361        );
1362        assert!(
1363            schema.get_column_index(Some("J"), "i_current_price").is_some(),
1364            "J.i_current_price should find via exact match"
1365        );
1366    }
1367
1368    #[test]
1369    fn test_column_distinct_cases_exact_match() {
1370        // When there are multiple columns with different cases (via delimited identifiers),
1371        // exact match takes precedence
1372        let cols: Vec<vibesql_catalog::ColumnSchema> = vec![
1373            vibesql_catalog::ColumnSchema::new("value".to_string(), DataType::Integer, true),
1374            vibesql_catalog::ColumnSchema::new("VALUE".to_string(), DataType::Integer, true),
1375            vibesql_catalog::ColumnSchema::new("Value".to_string(), DataType::Integer, true),
1376        ];
1377        let table_schema = vibesql_catalog::TableSchema::new("data".to_string(), cols);
1378        let schema = CombinedSchema::from_table("data".to_string(), table_schema);
1379
1380        // Each case variation should find its specific column
1381        assert_eq!(schema.get_column_index(Some("data"), "value"), Some(0));
1382        assert_eq!(schema.get_column_index(Some("data"), "VALUE"), Some(1));
1383        assert_eq!(schema.get_column_index(Some("data"), "Value"), Some(2));
1384    }
1385
1386    // ==========================================================================
1387    // SchemaBuilder - Case-Insensitive Tests
1388    // ==========================================================================
1389
1390    #[test]
1391    fn test_schema_builder_add_table_case_insensitive() {
1392        let mut builder = SchemaBuilder::new();
1393
1394        // Add tables with different case
1395        builder.add_table("ORDERS".to_string(), table_schema_with_column("ORDERS", "order_id"));
1396        builder.add_table("Items".to_string(), table_schema_with_column("Items", "item_id"));
1397
1398        let schema = builder.build();
1399
1400        // All case variations should work
1401        assert!(schema.get_column_index(Some("orders"), "order_id").is_some());
1402        assert!(schema.get_column_index(Some("ORDERS"), "order_id").is_some());
1403        assert!(schema.get_column_index(Some("items"), "item_id").is_some());
1404        assert!(schema.get_column_index(Some("ITEMS"), "item_id").is_some());
1405    }
1406
1407    #[test]
1408    fn test_schema_builder_from_schema_preserves_case_insensitivity() {
1409        // Create initial schema with uppercase table name
1410        let initial = CombinedSchema::from_table(
1411            "PRODUCTS".to_string(),
1412            table_schema_with_columns(
1413                "PRODUCTS",
1414                vec![("id", DataType::Integer), ("name", DataType::Varchar { max_length: None })],
1415            ),
1416        );
1417
1418        // Verify initial schema works
1419        assert!(initial.get_column_index(Some("products"), "id").is_some());
1420
1421        // Create builder from schema and add another table
1422        let mut builder = SchemaBuilder::from_schema(initial);
1423        builder
1424            .add_table("Categories".to_string(), table_schema_with_column("Categories", "cat_id"));
1425
1426        let final_schema = builder.build();
1427
1428        // Original table should still be case-insensitive
1429        assert!(final_schema.get_column_index(Some("products"), "id").is_some());
1430        assert!(final_schema.get_column_index(Some("PRODUCTS"), "id").is_some());
1431        assert!(final_schema.get_column_index(Some("Products"), "name").is_some());
1432
1433        // New table should also be case-insensitive
1434        assert!(final_schema.get_column_index(Some("categories"), "cat_id").is_some());
1435        assert!(final_schema.get_column_index(Some("CATEGORIES"), "cat_id").is_some());
1436    }
1437
1438    #[test]
1439    fn test_schema_builder_from_schema_multiple_tables() {
1440        // Create combined schema with multiple tables
1441        let orders = CombinedSchema::from_table(
1442            "Orders".to_string(),
1443            table_schema_with_column("Orders", "order_id"),
1444        );
1445        let combined = CombinedSchema::combine(
1446            orders,
1447            "Items".to_string(),
1448            table_schema_with_column("Items", "item_id"),
1449        );
1450
1451        // Create builder from combined schema
1452        let mut builder = SchemaBuilder::from_schema(combined);
1453        builder
1454            .add_table("CUSTOMERS".to_string(), table_schema_with_column("CUSTOMERS", "cust_id"));
1455
1456        let final_schema = builder.build();
1457
1458        // All tables should be case-insensitive
1459        assert!(final_schema.get_column_index(Some("orders"), "order_id").is_some());
1460        assert!(final_schema.get_column_index(Some("ORDERS"), "order_id").is_some());
1461        assert!(final_schema.get_column_index(Some("items"), "item_id").is_some());
1462        assert!(final_schema.get_column_index(Some("ITEMS"), "item_id").is_some());
1463        assert!(final_schema.get_column_index(Some("customers"), "cust_id").is_some());
1464        assert!(final_schema.get_column_index(Some("CUSTOMERS"), "cust_id").is_some());
1465
1466        // Verify column offsets are correct
1467        assert_eq!(final_schema.get_column_index(Some("orders"), "order_id"), Some(0));
1468        assert_eq!(final_schema.get_column_index(Some("items"), "item_id"), Some(1));
1469        assert_eq!(final_schema.get_column_index(Some("customers"), "cust_id"), Some(2));
1470    }
1471
1472    // ==========================================================================
1473    // Regression Tests for Issue #3633
1474    // ==========================================================================
1475
1476    #[test]
1477    fn test_issue_3633_correlated_subquery_alias_case() {
1478        // This test verifies the fix for issue #3633 where correlated subqueries
1479        // with uppercase aliases (like "J") couldn't find columns because the
1480        // parser uses uppercase but the schema stored lowercase.
1481
1482        // Simulate the scenario: outer query has table with alias "J"
1483        let schema = CombinedSchema::from_table(
1484            "J".to_string(), // Parser often uppercases aliases
1485            table_schema_with_columns(
1486                "items",
1487                vec![("price", DataType::DoublePrecision), ("quantity", DataType::Integer)],
1488            ),
1489        );
1490
1491        // The correlated subquery should be able to reference J.price
1492        // regardless of case used by the parser/resolver
1493        assert!(
1494            schema.get_column_index(Some("J"), "price").is_some(),
1495            "Uppercase J should find price (parser case)"
1496        );
1497        assert!(
1498            schema.get_column_index(Some("j"), "price").is_some(),
1499            "Lowercase j should find price (normalized case)"
1500        );
1501    }
1502
1503    #[test]
1504    fn test_issue_3633_multi_table_join_with_aliases() {
1505        // Simulates: SELECT * FROM orders O JOIN items I ON O.id = I.order_id
1506        let orders = CombinedSchema::from_table(
1507            "O".to_string(),
1508            table_schema_with_columns(
1509                "orders",
1510                vec![("id", DataType::Integer), ("date", DataType::Date)],
1511            ),
1512        );
1513
1514        let combined = CombinedSchema::combine(
1515            orders,
1516            "I".to_string(),
1517            table_schema_with_columns(
1518                "items",
1519                vec![("order_id", DataType::Integer), ("amount", DataType::DoublePrecision)],
1520            ),
1521        );
1522
1523        // Both O and I aliases should work case-insensitively
1524        // This is critical for correlated subqueries that reference outer aliases
1525        assert_eq!(combined.get_column_index(Some("O"), "id"), Some(0));
1526        assert_eq!(combined.get_column_index(Some("o"), "id"), Some(0));
1527        assert_eq!(combined.get_column_index(Some("O"), "date"), Some(1));
1528        assert_eq!(combined.get_column_index(Some("I"), "order_id"), Some(2));
1529        assert_eq!(combined.get_column_index(Some("i"), "order_id"), Some(2));
1530        assert_eq!(combined.get_column_index(Some("I"), "amount"), Some(3));
1531    }
1532
1533    // ==========================================================================
1534    // Edge Cases
1535    // ==========================================================================
1536
1537    #[test]
1538    fn test_nonexistent_table_returns_none() {
1539        let schema = CombinedSchema::from_table(
1540            "users".to_string(),
1541            table_schema_with_column("users", "id"),
1542        );
1543
1544        assert!(schema.get_column_index(Some("nonexistent"), "id").is_none());
1545        assert!(schema.get_column_index(Some("NONEXISTENT"), "id").is_none());
1546    }
1547
1548    #[test]
1549    fn test_nonexistent_column_returns_none() {
1550        let schema = CombinedSchema::from_table(
1551            "users".to_string(),
1552            table_schema_with_column("users", "id"),
1553        );
1554
1555        assert!(schema.get_column_index(Some("users"), "nonexistent").is_none());
1556        assert!(schema.get_column_index(Some("USERS"), "nonexistent").is_none());
1557    }
1558
1559    #[test]
1560    fn test_empty_table_name() {
1561        let schema = CombinedSchema::from_table("".to_string(), table_schema_with_column("", "id"));
1562
1563        // Empty string table should still work
1564        assert!(schema.get_column_index(Some(""), "id").is_some());
1565    }
1566
1567    #[test]
1568    fn test_total_columns_tracking() {
1569        let mut builder = SchemaBuilder::new();
1570        builder.add_table(
1571            "t1".to_string(),
1572            table_schema_with_columns(
1573                "t1",
1574                vec![("a", DataType::Integer), ("b", DataType::Integer)],
1575            ),
1576        );
1577        builder.add_table(
1578            "t2".to_string(),
1579            table_schema_with_columns("t2", vec![("c", DataType::Integer)]),
1580        );
1581
1582        let schema = builder.build();
1583        assert_eq!(schema.total_columns, 3);
1584    }
1585
1586    // ==========================================================================
1587    // Ambiguous Column Detection Tests (Issue #4391)
1588    // ==========================================================================
1589
1590    #[test]
1591    fn test_is_column_ambiguous_single_table() {
1592        // Single table - no column can be ambiguous
1593        let schema = CombinedSchema::from_table(
1594            "test1".to_string(),
1595            table_schema_with_columns(
1596                "test1",
1597                vec![("f1", DataType::Integer), ("f2", DataType::Integer)],
1598            ),
1599        );
1600
1601        assert!(!schema.is_column_ambiguous("f1"));
1602        assert!(!schema.is_column_ambiguous("f2"));
1603        assert!(!schema.is_column_ambiguous("nonexistent"));
1604    }
1605
1606    #[test]
1607    fn test_is_column_ambiguous_two_tables_no_overlap() {
1608        // Two tables with different columns - no ambiguity
1609        let test1 = CombinedSchema::from_table(
1610            "test1".to_string(),
1611            table_schema_with_columns(
1612                "test1",
1613                vec![("f1", DataType::Integer), ("f2", DataType::Integer)],
1614            ),
1615        );
1616        let schema = CombinedSchema::combine(
1617            test1,
1618            "test2".to_string(),
1619            table_schema_with_columns(
1620                "test2",
1621                vec![("f3", DataType::Integer), ("f4", DataType::Integer)],
1622            ),
1623        );
1624
1625        assert!(!schema.is_column_ambiguous("f1"));
1626        assert!(!schema.is_column_ambiguous("f2"));
1627        assert!(!schema.is_column_ambiguous("f3"));
1628        assert!(!schema.is_column_ambiguous("f4"));
1629    }
1630
1631    #[test]
1632    fn test_is_column_ambiguous_two_tables_with_overlap() {
1633        // Two tables with same column names - should be ambiguous
1634        // This is the exact scenario from issue #4391:
1635        // CREATE TABLE test1(f1, f2);
1636        // CREATE TABLE test2(f1, f2);
1637        // SELECT f1 FROM test1, test2;
1638        let test1 = CombinedSchema::from_table(
1639            "test1".to_string(),
1640            table_schema_with_columns(
1641                "test1",
1642                vec![("f1", DataType::Integer), ("f2", DataType::Integer)],
1643            ),
1644        );
1645        let schema = CombinedSchema::combine(
1646            test1,
1647            "test2".to_string(),
1648            table_schema_with_columns(
1649                "test2",
1650                vec![("f1", DataType::Integer), ("f2", DataType::Integer)],
1651            ),
1652        );
1653
1654        // Both f1 and f2 exist in both tables - should be ambiguous
1655        assert!(schema.is_column_ambiguous("f1"), "f1 should be ambiguous");
1656        assert!(schema.is_column_ambiguous("f2"), "f2 should be ambiguous");
1657
1658        // Nonexistent columns are not ambiguous (they just don't exist)
1659        assert!(!schema.is_column_ambiguous("f3"));
1660    }
1661
1662    #[test]
1663    fn test_is_column_ambiguous_case_insensitive() {
1664        // Column names should be matched case-insensitively
1665        let test1 = CombinedSchema::from_table(
1666            "test1".to_string(),
1667            table_schema_with_columns("test1", vec![("F1", DataType::Integer)]),
1668        );
1669        let schema = CombinedSchema::combine(
1670            test1,
1671            "test2".to_string(),
1672            table_schema_with_columns("test2", vec![("f1", DataType::Integer)]),
1673        );
1674
1675        // F1 and f1 should be considered the same column, so it's ambiguous
1676        assert!(schema.is_column_ambiguous("f1"));
1677        assert!(schema.is_column_ambiguous("F1"));
1678        assert!(schema.is_column_ambiguous("F1")); // Mixed case
1679    }
1680
1681    #[test]
1682    fn test_is_column_ambiguous_partial_overlap() {
1683        // Two tables where only some columns overlap
1684        let test1 = CombinedSchema::from_table(
1685            "test1".to_string(),
1686            table_schema_with_columns(
1687                "test1",
1688                vec![("id", DataType::Integer), ("name", DataType::Varchar { max_length: None })],
1689            ),
1690        );
1691        let schema = CombinedSchema::combine(
1692            test1,
1693            "test2".to_string(),
1694            table_schema_with_columns(
1695                "test2",
1696                vec![
1697                    ("id", DataType::Integer),    // Same as test1
1698                    ("value", DataType::Integer), // Different from test1
1699                ],
1700            ),
1701        );
1702
1703        // id is in both tables - ambiguous
1704        assert!(schema.is_column_ambiguous("id"));
1705
1706        // name only in test1, value only in test2 - not ambiguous
1707        assert!(!schema.is_column_ambiguous("name"));
1708        assert!(!schema.is_column_ambiguous("value"));
1709    }
1710
1711    #[test]
1712    fn test_is_column_ambiguous_three_tables() {
1713        // Three tables where a column appears in multiple (but not all)
1714        let t1 = CombinedSchema::from_table(
1715            "t1".to_string(),
1716            table_schema_with_columns(
1717                "t1",
1718                vec![("a", DataType::Integer), ("b", DataType::Integer)],
1719            ),
1720        );
1721        let t1_t2 = CombinedSchema::combine(
1722            t1,
1723            "t2".to_string(),
1724            table_schema_with_columns(
1725                "t2",
1726                vec![("b", DataType::Integer), ("c", DataType::Integer)],
1727            ),
1728        );
1729        let schema = CombinedSchema::combine(
1730            t1_t2,
1731            "t3".to_string(),
1732            table_schema_with_columns(
1733                "t3",
1734                vec![("c", DataType::Integer), ("d", DataType::Integer)],
1735            ),
1736        );
1737
1738        // a only in t1 - not ambiguous
1739        assert!(!schema.is_column_ambiguous("a"));
1740
1741        // b in t1 and t2 - ambiguous
1742        assert!(schema.is_column_ambiguous("b"));
1743
1744        // c in t2 and t3 - ambiguous
1745        assert!(schema.is_column_ambiguous("c"));
1746
1747        // d only in t3 - not ambiguous
1748        assert!(!schema.is_column_ambiguous("d"));
1749    }
1750
1751    // ==========================================================================
1752    // Joined Column Tests (Issue #4517 - NATURAL JOIN columns not ambiguous)
1753    // ==========================================================================
1754
1755    #[test]
1756    fn test_joined_column_not_ambiguous_natural_join() {
1757        // Test that columns marked as "joined" via NATURAL JOIN are not ambiguous
1758        // This simulates: SELECT b FROM t1 NATURAL JOIN t2 WHERE t1.b = t2.b AND t1.c = t2.c
1759        let t1 = CombinedSchema::from_table(
1760            "t1".to_string(),
1761            table_schema_with_columns(
1762                "t1",
1763                vec![("a", DataType::Integer), ("b", DataType::Integer), ("c", DataType::Integer)],
1764            ),
1765        );
1766        let mut schema = CombinedSchema::combine(
1767            t1,
1768            "t2".to_string(),
1769            table_schema_with_columns(
1770                "t2",
1771                vec![("b", DataType::Integer), ("c", DataType::Integer), ("d", DataType::Integer)],
1772            ),
1773        );
1774
1775        // Before marking as joined, b and c should be ambiguous
1776        assert!(
1777            schema.is_column_ambiguous("b"),
1778            "b should be ambiguous before NATURAL JOIN processing"
1779        );
1780        assert!(
1781            schema.is_column_ambiguous("c"),
1782            "c should be ambiguous before NATURAL JOIN processing"
1783        );
1784
1785        // Mark b and c as joined columns (as would happen in NATURAL JOIN processing)
1786        schema.add_joined_column("b");
1787        schema.add_joined_column("c");
1788
1789        // After marking as joined, b and c should NOT be ambiguous
1790        assert!(!schema.is_column_ambiguous("b"), "b should NOT be ambiguous after NATURAL JOIN");
1791        assert!(!schema.is_column_ambiguous("c"), "c should NOT be ambiguous after NATURAL JOIN");
1792
1793        // a only in t1, d only in t2 - never ambiguous
1794        assert!(!schema.is_column_ambiguous("a"));
1795        assert!(!schema.is_column_ambiguous("d"));
1796    }
1797
1798    #[test]
1799    fn test_joined_column_case_insensitive() {
1800        // Test that joined column matching is case-insensitive
1801        let t1 = CombinedSchema::from_table(
1802            "t1".to_string(),
1803            table_schema_with_columns("t1", vec![("Name", DataType::Integer)]),
1804        );
1805        let mut schema = CombinedSchema::combine(
1806            t1,
1807            "t2".to_string(),
1808            table_schema_with_columns("t2", vec![("NAME", DataType::Integer)]),
1809        );
1810
1811        // Before marking as joined, should be ambiguous
1812        assert!(schema.is_column_ambiguous("name"));
1813        assert!(schema.is_column_ambiguous("NAME"));
1814        assert!(schema.is_column_ambiguous("Name"));
1815
1816        // Mark as joined with lowercase
1817        schema.add_joined_column("name");
1818
1819        // All case variants should now be non-ambiguous
1820        assert!(!schema.is_column_ambiguous("name"));
1821        assert!(!schema.is_column_ambiguous("NAME"));
1822        assert!(!schema.is_column_ambiguous("Name"));
1823    }
1824
1825    #[test]
1826    fn test_joined_column_with_using_clause() {
1827        // Test USING clause behavior (similar to NATURAL JOIN but explicit columns)
1828        // This simulates: SELECT id FROM t1 JOIN t2 USING(id)
1829        let t1 = CombinedSchema::from_table(
1830            "t1".to_string(),
1831            table_schema_with_columns(
1832                "t1",
1833                vec![("id", DataType::Integer), ("value1", DataType::Integer)],
1834            ),
1835        );
1836        let mut schema = CombinedSchema::combine(
1837            t1,
1838            "t2".to_string(),
1839            table_schema_with_columns(
1840                "t2",
1841                vec![("id", DataType::Integer), ("value2", DataType::Integer)],
1842            ),
1843        );
1844
1845        // Mark id as joined (as would happen in USING clause processing)
1846        schema.add_joined_column("id");
1847
1848        // id should NOT be ambiguous (it's a USING column)
1849        assert!(!schema.is_column_ambiguous("id"));
1850
1851        // value1 and value2 are unique to their tables - not ambiguous
1852        assert!(!schema.is_column_ambiguous("value1"));
1853        assert!(!schema.is_column_ambiguous("value2"));
1854    }
1855}