datafusion_common/
column.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Column
19
20use crate::error::{_schema_err, add_possible_columns_to_diag};
21use crate::utils::parse_identifiers_normalized;
22use crate::utils::quote_identifier;
23use crate::{DFSchema, Diagnostic, Result, SchemaError, Spans, TableReference};
24use arrow::datatypes::{Field, FieldRef};
25use std::collections::HashSet;
26use std::fmt;
27
28/// A named reference to a qualified field in a schema.
29#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
30pub struct Column {
31    /// relation/table reference.
32    pub relation: Option<TableReference>,
33    /// field/column name.
34    pub name: String,
35    /// Original source code location, if known
36    pub spans: Spans,
37}
38
39impl fmt::Debug for Column {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        f.debug_struct("Column")
42            .field("relation", &self.relation)
43            .field("name", &self.name)
44            .finish()
45    }
46}
47
48impl Column {
49    /// Create Column from optional qualifier and name. The optional qualifier, if present,
50    /// will be parsed and normalized by default.
51    ///
52    /// See full details on [`TableReference::parse_str`]
53    ///
54    /// [`TableReference::parse_str`]: crate::TableReference::parse_str
55    pub fn new(
56        relation: Option<impl Into<TableReference>>,
57        name: impl Into<String>,
58    ) -> Self {
59        Self {
60            relation: relation.map(|r| r.into()),
61            name: name.into(),
62            spans: Spans::new(),
63        }
64    }
65
66    /// Convenience method for when there is no qualifier
67    pub fn new_unqualified(name: impl Into<String>) -> Self {
68        Self {
69            relation: None,
70            name: name.into(),
71            spans: Spans::new(),
72        }
73    }
74
75    /// Create Column from unqualified name.
76    ///
77    /// Alias for `Column::new_unqualified`
78    pub fn from_name(name: impl Into<String>) -> Self {
79        Self {
80            relation: None,
81            name: name.into(),
82            spans: Spans::new(),
83        }
84    }
85
86    /// Create a Column from multiple normalized identifiers
87    ///
88    /// For example, `foo.bar` would be represented as a two element vector
89    /// `["foo", "bar"]`
90    fn from_idents(mut idents: Vec<String>) -> Option<Self> {
91        let (relation, name) = match idents.len() {
92            1 => (None, idents.remove(0)),
93            2 => (
94                Some(TableReference::Bare {
95                    table: idents.remove(0).into(),
96                }),
97                idents.remove(0),
98            ),
99            3 => (
100                Some(TableReference::Partial {
101                    schema: idents.remove(0).into(),
102                    table: idents.remove(0).into(),
103                }),
104                idents.remove(0),
105            ),
106            4 => (
107                Some(TableReference::Full {
108                    catalog: idents.remove(0).into(),
109                    schema: idents.remove(0).into(),
110                    table: idents.remove(0).into(),
111                }),
112                idents.remove(0),
113            ),
114            // any expression that failed to parse or has more than 4 period delimited
115            // identifiers will be treated as an unqualified column name
116            _ => return None,
117        };
118        Some(Self {
119            relation,
120            name,
121            spans: Spans::new(),
122        })
123    }
124
125    /// Deserialize a fully qualified name string into a column
126    ///
127    /// Treats the name as a SQL identifier. For example
128    /// `foo.BAR` would be parsed to a reference to relation `foo`, column name `bar` (lower case)
129    /// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
130    pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
131        let flat_name = flat_name.into();
132        Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or_else(
133            || Self {
134                relation: None,
135                name: flat_name,
136                spans: Spans::new(),
137            },
138        )
139    }
140
141    /// Deserialize a fully qualified name string into a column preserving column text case
142    #[cfg(feature = "sql")]
143    pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
144        let flat_name = flat_name.into();
145        Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or_else(
146            || Self {
147                relation: None,
148                name: flat_name,
149                spans: Spans::new(),
150            },
151        )
152    }
153
154    #[cfg(not(feature = "sql"))]
155    pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
156        Self::from_qualified_name(flat_name)
157    }
158
159    /// return the column's name.
160    ///
161    /// Note: This ignores the relation and returns the column name only.
162    pub fn name(&self) -> &str {
163        &self.name
164    }
165
166    /// Serialize column into a flat name string
167    pub fn flat_name(&self) -> String {
168        match &self.relation {
169            Some(r) => format!("{}.{}", r, self.name),
170            None => self.name.clone(),
171        }
172    }
173
174    /// Serialize column into a quoted flat name string
175    pub fn quoted_flat_name(&self) -> String {
176        match &self.relation {
177            Some(r) => {
178                format!(
179                    "{}.{}",
180                    r.to_quoted_string(),
181                    quote_identifier(self.name.as_str())
182                )
183            }
184            None => quote_identifier(&self.name).to_string(),
185        }
186    }
187
188    /// Qualify column if not done yet.
189    ///
190    /// If this column already has a [relation](Self::relation), it will be returned as is and the given parameters are
191    /// ignored. Otherwise this will search through the given schemas to find the column.
192    ///
193    /// Will check for ambiguity at each level of `schemas`.
194    ///
195    /// A schema matches if there is a single column that -- when unqualified -- matches this column. There is an
196    /// exception for `USING` statements, see below.
197    ///
198    /// # Using columns
199    /// Take the following SQL statement:
200    ///
201    /// ```sql
202    /// SELECT id FROM t1 JOIN t2 USING(id)
203    /// ```
204    ///
205    /// In this case, both `t1.id` and `t2.id` will match unqualified column `id`. To express this possibility, use
206    /// `using_columns`. Each entry in this array is a set of columns that are bound together via a `USING` clause. So
207    /// in this example this would be `[{t1.id, t2.id}]`.
208    ///
209    /// Regarding ambiguity check, `schemas` is structured to allow levels of schemas to be passed in.
210    /// For example:
211    ///
212    /// ```text
213    /// schemas = &[
214    ///    &[schema1, schema2], // first level
215    ///    &[schema3, schema4], // second level
216    /// ]
217    /// ```
218    ///
219    /// Will search for a matching field in all schemas in the first level. If a matching field according to above
220    /// mentioned conditions is not found, then will check the next level. If found more than one matching column across
221    /// all schemas in a level, that isn't a USING column, will return an error due to ambiguous column.
222    ///
223    /// If checked all levels and couldn't find field, will return field not found error.
224    pub fn normalize_with_schemas_and_ambiguity_check(
225        self,
226        schemas: &[&[&DFSchema]],
227        using_columns: &[HashSet<Column>],
228    ) -> Result<Self> {
229        if self.relation.is_some() {
230            return Ok(self);
231        }
232
233        for schema_level in schemas {
234            let qualified_fields = schema_level
235                .iter()
236                .flat_map(|s| s.qualified_fields_with_unqualified_name(&self.name))
237                .collect::<Vec<_>>();
238            match qualified_fields.len() {
239                0 => continue,
240                1 => return Ok(Column::from(qualified_fields[0])),
241                _ => {
242                    // More than 1 fields in this schema have their names set to self.name.
243                    //
244                    // This should only happen when a JOIN query with USING constraint references
245                    // join columns using unqualified column name. For example:
246                    //
247                    // ```sql
248                    // SELECT id FROM t1 JOIN t2 USING(id)
249                    // ```
250                    //
251                    // In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
252                    // We will use the relation from the first matched field to normalize self.
253
254                    // Compare matched fields with one USING JOIN clause at a time
255                    let columns = schema_level
256                        .iter()
257                        .flat_map(|s| s.columns_with_unqualified_name(&self.name))
258                        .collect::<Vec<_>>();
259                    for using_col in using_columns {
260                        let all_matched = columns.iter().all(|c| using_col.contains(c));
261                        // All matched fields belong to the same using column set, in other words
262                        // the same join clause. We simply pick the qualifier from the first match.
263                        if all_matched {
264                            return Ok(columns[0].clone());
265                        }
266                    }
267
268                    // If not due to USING columns then due to ambiguous column name
269                    return _schema_err!(SchemaError::AmbiguousReference {
270                        field: Box::new(Column::new_unqualified(&self.name)),
271                    })
272                    .map_err(|err| {
273                        let mut diagnostic = Diagnostic::new_error(
274                            format!("column '{}' is ambiguous", &self.name),
275                            self.spans().first(),
276                        );
277                        // TODO If [`DFSchema`] had spans, we could show the
278                        // user which columns are candidates, or which table
279                        // they come from. For now, let's list the table names
280                        // only.
281                        add_possible_columns_to_diag(
282                            &mut diagnostic,
283                            &Column::new_unqualified(&self.name),
284                            &columns,
285                        );
286                        err.with_diagnostic(diagnostic)
287                    });
288                }
289            }
290        }
291
292        _schema_err!(SchemaError::FieldNotFound {
293            field: Box::new(self),
294            valid_fields: schemas
295                .iter()
296                .flat_map(|s| s.iter())
297                .flat_map(|s| s.columns())
298                .collect(),
299        })
300    }
301
302    /// Returns a reference to the set of locations in the SQL query where this
303    /// column appears, if known.
304    pub fn spans(&self) -> &Spans {
305        &self.spans
306    }
307
308    /// Returns a mutable reference to the set of locations in the SQL query
309    /// where this column appears, if known.
310    pub fn spans_mut(&mut self) -> &mut Spans {
311        &mut self.spans
312    }
313
314    /// Replaces the set of locations in the SQL query where this column
315    /// appears, if known.
316    pub fn with_spans(mut self, spans: Spans) -> Self {
317        self.spans = spans;
318        self
319    }
320
321    /// Qualifies the column with the given table reference.
322    pub fn with_relation(&self, relation: TableReference) -> Self {
323        Self {
324            relation: Some(relation),
325            ..self.clone()
326        }
327    }
328}
329
330impl From<&str> for Column {
331    fn from(c: &str) -> Self {
332        Self::from_qualified_name(c)
333    }
334}
335
336/// Create a column, cloning the string
337impl From<&String> for Column {
338    fn from(c: &String) -> Self {
339        Self::from_qualified_name(c)
340    }
341}
342
343/// Create a column, reusing the existing string
344impl From<String> for Column {
345    fn from(c: String) -> Self {
346        Self::from_qualified_name(c)
347    }
348}
349
350/// Create a column, use qualifier and field name
351impl From<(Option<&TableReference>, &Field)> for Column {
352    fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
353        Self::new(relation.cloned(), field.name())
354    }
355}
356
357/// Create a column, use qualifier and field name
358impl From<(Option<&TableReference>, &FieldRef)> for Column {
359    fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
360        Self::new(relation.cloned(), field.name())
361    }
362}
363
364#[cfg(feature = "sql")]
365impl std::str::FromStr for Column {
366    type Err = std::convert::Infallible;
367
368    fn from_str(s: &str) -> Result<Self, Self::Err> {
369        Ok(s.into())
370    }
371}
372
373impl fmt::Display for Column {
374    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
375        write!(f, "{}", self.flat_name())
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use arrow::datatypes::{DataType, SchemaBuilder};
383    use std::sync::Arc;
384
385    fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
386        let mut schema_builder = SchemaBuilder::new();
387        schema_builder.extend(
388            names
389                .iter()
390                .map(|f| Field::new(*f, DataType::Boolean, true)),
391        );
392        let schema = Arc::new(schema_builder.finish());
393        DFSchema::try_from_qualified_schema(qualifier, &schema)
394    }
395
396    #[test]
397    fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> {
398        let schema1 = create_qualified_schema("t1", vec!["a", "b"])?;
399        let schema2 = create_qualified_schema("t2", vec!["c", "d"])?;
400        let schema3 = create_qualified_schema("t3", vec!["a", "b", "c", "d", "e"])?;
401
402        // already normalized
403        let col = Column::new(Some("t1"), "a");
404        let col = col.normalize_with_schemas_and_ambiguity_check(&[], &[])?;
405        assert_eq!(col, Column::new(Some("t1"), "a"));
406
407        // should find in first level (schema1)
408        let col = Column::from_name("a");
409        let col = col.normalize_with_schemas_and_ambiguity_check(
410            &[&[&schema1, &schema2], &[&schema3]],
411            &[],
412        )?;
413        assert_eq!(col, Column::new(Some("t1"), "a"));
414
415        // should find in second level (schema3)
416        let col = Column::from_name("e");
417        let col = col.normalize_with_schemas_and_ambiguity_check(
418            &[&[&schema1, &schema2], &[&schema3]],
419            &[],
420        )?;
421        assert_eq!(col, Column::new(Some("t3"), "e"));
422
423        // using column in first level (pick schema1)
424        let mut using_columns = HashSet::new();
425        using_columns.insert(Column::new(Some("t1"), "a"));
426        using_columns.insert(Column::new(Some("t3"), "a"));
427        let col = Column::from_name("a");
428        let col = col.normalize_with_schemas_and_ambiguity_check(
429            &[&[&schema1, &schema3], &[&schema2]],
430            &[using_columns],
431        )?;
432        assert_eq!(col, Column::new(Some("t1"), "a"));
433
434        // not found in any level
435        let col = Column::from_name("z");
436        let err = col
437            .normalize_with_schemas_and_ambiguity_check(
438                &[&[&schema1, &schema2], &[&schema3]],
439                &[],
440            )
441            .expect_err("should've failed to find field");
442        let expected = "Schema error: No field named z. \
443            Valid fields are t1.a, t1.b, t2.c, t2.d, t3.a, t3.b, t3.c, t3.d, t3.e.";
444        assert_eq!(err.strip_backtrace(), expected);
445
446        // ambiguous column reference
447        let col = Column::from_name("a");
448        let err = col
449            .normalize_with_schemas_and_ambiguity_check(
450                &[&[&schema1, &schema3], &[&schema2]],
451                &[],
452            )
453            .expect_err("should've found ambiguous field");
454        let expected = "Schema error: Ambiguous reference to unqualified field a";
455        assert_eq!(err.strip_backtrace(), expected);
456
457        Ok(())
458    }
459}