datafusion_common/column.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Column
19
20use crate::error::{_schema_err, add_possible_columns_to_diag};
21use crate::utils::parse_identifiers_normalized;
22use crate::utils::quote_identifier;
23use crate::{DFSchema, Diagnostic, Result, SchemaError, Spans, TableReference};
24use arrow::datatypes::{Field, FieldRef};
25use std::collections::HashSet;
26use std::fmt;
27
28/// A named reference to a qualified field in a schema.
29#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
30pub struct Column {
31 /// relation/table reference.
32 pub relation: Option<TableReference>,
33 /// field/column name.
34 pub name: String,
35 /// Original source code location, if known
36 pub spans: Spans,
37}
38
39impl fmt::Debug for Column {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.debug_struct("Column")
42 .field("relation", &self.relation)
43 .field("name", &self.name)
44 .finish()
45 }
46}
47
48impl Column {
49 /// Create Column from optional qualifier and name. The optional qualifier, if present,
50 /// will be parsed and normalized by default.
51 ///
52 /// See full details on [`TableReference::parse_str`]
53 ///
54 /// [`TableReference::parse_str`]: crate::TableReference::parse_str
55 pub fn new(
56 relation: Option<impl Into<TableReference>>,
57 name: impl Into<String>,
58 ) -> Self {
59 Self {
60 relation: relation.map(|r| r.into()),
61 name: name.into(),
62 spans: Spans::new(),
63 }
64 }
65
66 /// Convenience method for when there is no qualifier
67 pub fn new_unqualified(name: impl Into<String>) -> Self {
68 Self {
69 relation: None,
70 name: name.into(),
71 spans: Spans::new(),
72 }
73 }
74
75 /// Create Column from unqualified name.
76 ///
77 /// Alias for `Column::new_unqualified`
78 pub fn from_name(name: impl Into<String>) -> Self {
79 Self {
80 relation: None,
81 name: name.into(),
82 spans: Spans::new(),
83 }
84 }
85
86 /// Create a Column from multiple normalized identifiers
87 ///
88 /// For example, `foo.bar` would be represented as a two element vector
89 /// `["foo", "bar"]`
90 fn from_idents(mut idents: Vec<String>) -> Option<Self> {
91 let (relation, name) = match idents.len() {
92 1 => (None, idents.remove(0)),
93 2 => (
94 Some(TableReference::Bare {
95 table: idents.remove(0).into(),
96 }),
97 idents.remove(0),
98 ),
99 3 => (
100 Some(TableReference::Partial {
101 schema: idents.remove(0).into(),
102 table: idents.remove(0).into(),
103 }),
104 idents.remove(0),
105 ),
106 4 => (
107 Some(TableReference::Full {
108 catalog: idents.remove(0).into(),
109 schema: idents.remove(0).into(),
110 table: idents.remove(0).into(),
111 }),
112 idents.remove(0),
113 ),
114 // any expression that failed to parse or has more than 4 period delimited
115 // identifiers will be treated as an unqualified column name
116 _ => return None,
117 };
118 Some(Self {
119 relation,
120 name,
121 spans: Spans::new(),
122 })
123 }
124
125 /// Deserialize a fully qualified name string into a column
126 ///
127 /// Treats the name as a SQL identifier. For example
128 /// `foo.BAR` would be parsed to a reference to relation `foo`, column name `bar` (lower case)
129 /// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
130 pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
131 let flat_name = flat_name.into();
132 Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or_else(
133 || Self {
134 relation: None,
135 name: flat_name,
136 spans: Spans::new(),
137 },
138 )
139 }
140
141 /// Deserialize a fully qualified name string into a column preserving column text case
142 #[cfg(feature = "sql")]
143 pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
144 let flat_name = flat_name.into();
145 Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or_else(
146 || Self {
147 relation: None,
148 name: flat_name,
149 spans: Spans::new(),
150 },
151 )
152 }
153
154 #[cfg(not(feature = "sql"))]
155 pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
156 Self::from_qualified_name(flat_name)
157 }
158
159 /// return the column's name.
160 ///
161 /// Note: This ignores the relation and returns the column name only.
162 pub fn name(&self) -> &str {
163 &self.name
164 }
165
166 /// Serialize column into a flat name string
167 pub fn flat_name(&self) -> String {
168 match &self.relation {
169 Some(r) => format!("{}.{}", r, self.name),
170 None => self.name.clone(),
171 }
172 }
173
174 /// Serialize column into a quoted flat name string
175 pub fn quoted_flat_name(&self) -> String {
176 match &self.relation {
177 Some(r) => {
178 format!(
179 "{}.{}",
180 r.to_quoted_string(),
181 quote_identifier(self.name.as_str())
182 )
183 }
184 None => quote_identifier(&self.name).to_string(),
185 }
186 }
187
188 /// Qualify column if not done yet.
189 ///
190 /// If this column already has a [relation](Self::relation), it will be returned as is and the given parameters are
191 /// ignored. Otherwise this will search through the given schemas to find the column.
192 ///
193 /// Will check for ambiguity at each level of `schemas`.
194 ///
195 /// A schema matches if there is a single column that -- when unqualified -- matches this column. There is an
196 /// exception for `USING` statements, see below.
197 ///
198 /// # Using columns
199 /// Take the following SQL statement:
200 ///
201 /// ```sql
202 /// SELECT id FROM t1 JOIN t2 USING(id)
203 /// ```
204 ///
205 /// In this case, both `t1.id` and `t2.id` will match unqualified column `id`. To express this possibility, use
206 /// `using_columns`. Each entry in this array is a set of columns that are bound together via a `USING` clause. So
207 /// in this example this would be `[{t1.id, t2.id}]`.
208 ///
209 /// Regarding ambiguity check, `schemas` is structured to allow levels of schemas to be passed in.
210 /// For example:
211 ///
212 /// ```text
213 /// schemas = &[
214 /// &[schema1, schema2], // first level
215 /// &[schema3, schema4], // second level
216 /// ]
217 /// ```
218 ///
219 /// Will search for a matching field in all schemas in the first level. If a matching field according to above
220 /// mentioned conditions is not found, then will check the next level. If found more than one matching column across
221 /// all schemas in a level, that isn't a USING column, will return an error due to ambiguous column.
222 ///
223 /// If checked all levels and couldn't find field, will return field not found error.
224 pub fn normalize_with_schemas_and_ambiguity_check(
225 self,
226 schemas: &[&[&DFSchema]],
227 using_columns: &[HashSet<Column>],
228 ) -> Result<Self> {
229 if self.relation.is_some() {
230 return Ok(self);
231 }
232
233 for schema_level in schemas {
234 let qualified_fields = schema_level
235 .iter()
236 .flat_map(|s| s.qualified_fields_with_unqualified_name(&self.name))
237 .collect::<Vec<_>>();
238 match qualified_fields.len() {
239 0 => continue,
240 1 => return Ok(Column::from(qualified_fields[0])),
241 _ => {
242 // More than 1 fields in this schema have their names set to self.name.
243 //
244 // This should only happen when a JOIN query with USING constraint references
245 // join columns using unqualified column name. For example:
246 //
247 // ```sql
248 // SELECT id FROM t1 JOIN t2 USING(id)
249 // ```
250 //
251 // In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
252 // We will use the relation from the first matched field to normalize self.
253
254 // Compare matched fields with one USING JOIN clause at a time
255 let columns = schema_level
256 .iter()
257 .flat_map(|s| s.columns_with_unqualified_name(&self.name))
258 .collect::<Vec<_>>();
259 for using_col in using_columns {
260 let all_matched = columns.iter().all(|c| using_col.contains(c));
261 // All matched fields belong to the same using column set, in other words
262 // the same join clause. We simply pick the qualifier from the first match.
263 if all_matched {
264 return Ok(columns[0].clone());
265 }
266 }
267
268 // If not due to USING columns then due to ambiguous column name
269 return _schema_err!(SchemaError::AmbiguousReference {
270 field: Box::new(Column::new_unqualified(&self.name)),
271 })
272 .map_err(|err| {
273 let mut diagnostic = Diagnostic::new_error(
274 format!("column '{}' is ambiguous", &self.name),
275 self.spans().first(),
276 );
277 // TODO If [`DFSchema`] had spans, we could show the
278 // user which columns are candidates, or which table
279 // they come from. For now, let's list the table names
280 // only.
281 add_possible_columns_to_diag(
282 &mut diagnostic,
283 &Column::new_unqualified(&self.name),
284 &columns,
285 );
286 err.with_diagnostic(diagnostic)
287 });
288 }
289 }
290 }
291
292 _schema_err!(SchemaError::FieldNotFound {
293 field: Box::new(self),
294 valid_fields: schemas
295 .iter()
296 .flat_map(|s| s.iter())
297 .flat_map(|s| s.columns())
298 .collect(),
299 })
300 }
301
302 /// Returns a reference to the set of locations in the SQL query where this
303 /// column appears, if known.
304 pub fn spans(&self) -> &Spans {
305 &self.spans
306 }
307
308 /// Returns a mutable reference to the set of locations in the SQL query
309 /// where this column appears, if known.
310 pub fn spans_mut(&mut self) -> &mut Spans {
311 &mut self.spans
312 }
313
314 /// Replaces the set of locations in the SQL query where this column
315 /// appears, if known.
316 pub fn with_spans(mut self, spans: Spans) -> Self {
317 self.spans = spans;
318 self
319 }
320
321 /// Qualifies the column with the given table reference.
322 pub fn with_relation(&self, relation: TableReference) -> Self {
323 Self {
324 relation: Some(relation),
325 ..self.clone()
326 }
327 }
328}
329
330impl From<&str> for Column {
331 fn from(c: &str) -> Self {
332 Self::from_qualified_name(c)
333 }
334}
335
336/// Create a column, cloning the string
337impl From<&String> for Column {
338 fn from(c: &String) -> Self {
339 Self::from_qualified_name(c)
340 }
341}
342
343/// Create a column, reusing the existing string
344impl From<String> for Column {
345 fn from(c: String) -> Self {
346 Self::from_qualified_name(c)
347 }
348}
349
350/// Create a column, use qualifier and field name
351impl From<(Option<&TableReference>, &Field)> for Column {
352 fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
353 Self::new(relation.cloned(), field.name())
354 }
355}
356
357/// Create a column, use qualifier and field name
358impl From<(Option<&TableReference>, &FieldRef)> for Column {
359 fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
360 Self::new(relation.cloned(), field.name())
361 }
362}
363
364#[cfg(feature = "sql")]
365impl std::str::FromStr for Column {
366 type Err = std::convert::Infallible;
367
368 fn from_str(s: &str) -> Result<Self, Self::Err> {
369 Ok(s.into())
370 }
371}
372
373impl fmt::Display for Column {
374 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
375 write!(f, "{}", self.flat_name())
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use arrow::datatypes::{DataType, SchemaBuilder};
383 use std::sync::Arc;
384
385 fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
386 let mut schema_builder = SchemaBuilder::new();
387 schema_builder.extend(
388 names
389 .iter()
390 .map(|f| Field::new(*f, DataType::Boolean, true)),
391 );
392 let schema = Arc::new(schema_builder.finish());
393 DFSchema::try_from_qualified_schema(qualifier, &schema)
394 }
395
396 #[test]
397 fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> {
398 let schema1 = create_qualified_schema("t1", vec!["a", "b"])?;
399 let schema2 = create_qualified_schema("t2", vec!["c", "d"])?;
400 let schema3 = create_qualified_schema("t3", vec!["a", "b", "c", "d", "e"])?;
401
402 // already normalized
403 let col = Column::new(Some("t1"), "a");
404 let col = col.normalize_with_schemas_and_ambiguity_check(&[], &[])?;
405 assert_eq!(col, Column::new(Some("t1"), "a"));
406
407 // should find in first level (schema1)
408 let col = Column::from_name("a");
409 let col = col.normalize_with_schemas_and_ambiguity_check(
410 &[&[&schema1, &schema2], &[&schema3]],
411 &[],
412 )?;
413 assert_eq!(col, Column::new(Some("t1"), "a"));
414
415 // should find in second level (schema3)
416 let col = Column::from_name("e");
417 let col = col.normalize_with_schemas_and_ambiguity_check(
418 &[&[&schema1, &schema2], &[&schema3]],
419 &[],
420 )?;
421 assert_eq!(col, Column::new(Some("t3"), "e"));
422
423 // using column in first level (pick schema1)
424 let mut using_columns = HashSet::new();
425 using_columns.insert(Column::new(Some("t1"), "a"));
426 using_columns.insert(Column::new(Some("t3"), "a"));
427 let col = Column::from_name("a");
428 let col = col.normalize_with_schemas_and_ambiguity_check(
429 &[&[&schema1, &schema3], &[&schema2]],
430 &[using_columns],
431 )?;
432 assert_eq!(col, Column::new(Some("t1"), "a"));
433
434 // not found in any level
435 let col = Column::from_name("z");
436 let err = col
437 .normalize_with_schemas_and_ambiguity_check(
438 &[&[&schema1, &schema2], &[&schema3]],
439 &[],
440 )
441 .expect_err("should've failed to find field");
442 let expected = "Schema error: No field named z. \
443 Valid fields are t1.a, t1.b, t2.c, t2.d, t3.a, t3.b, t3.c, t3.d, t3.e.";
444 assert_eq!(err.strip_backtrace(), expected);
445
446 // ambiguous column reference
447 let col = Column::from_name("a");
448 let err = col
449 .normalize_with_schemas_and_ambiguity_check(
450 &[&[&schema1, &schema3], &[&schema2]],
451 &[],
452 )
453 .expect_err("should've found ambiguous field");
454 let expected = "Schema error: Ambiguous reference to unqualified field a";
455 assert_eq!(err.strip_backtrace(), expected);
456
457 Ok(())
458 }
459}