datafusion_common/
functional_dependencies.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! FunctionalDependencies keeps track of functional dependencies
19//! inside DFSchema.
20
21use std::fmt::{Display, Formatter};
22use std::ops::Deref;
23use std::vec::IntoIter;
24
25use crate::utils::{merge_and_order_indices, set_difference};
26use crate::{DFSchema, HashSet, JoinType};
27
28/// This object defines a constraint on a table.
29#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
30pub enum Constraint {
31    /// Columns with the given indices form a composite primary key (they are
32    /// jointly unique and not nullable):
33    PrimaryKey(Vec<usize>),
34    /// Columns with the given indices form a composite unique key:
35    Unique(Vec<usize>),
36}
37
38/// This object encapsulates a list of functional constraints:
39#[derive(Clone, Debug, Default, Eq, Hash, PartialEq, PartialOrd)]
40pub struct Constraints {
41    inner: Vec<Constraint>,
42}
43
44impl Constraints {
45    /// Create a new [`Constraints`] object from the given `constraints`.
46    /// Users should use the [`Constraints::default`] or [`SqlToRel::new_constraint_from_table_constraints`]
47    /// functions for constructing [`Constraints`] instances. This constructor
48    /// is for internal purposes only and does not check whether the argument
49    /// is valid. The user is responsible for supplying a valid vector of
50    /// [`Constraint`] objects.
51    ///
52    /// [`SqlToRel::new_constraint_from_table_constraints`]: https://docs.rs/datafusion/latest/datafusion/sql/planner/struct.SqlToRel.html#method.new_constraint_from_table_constraints
53    pub fn new_unverified(constraints: Vec<Constraint>) -> Self {
54        Self { inner: constraints }
55    }
56
57    /// Extends the current constraints with the given `other` constraints.
58    pub fn extend(&mut self, other: Constraints) {
59        self.inner.extend(other.inner);
60    }
61
62    /// Projects constraints using the given projection indices. Returns `None`
63    /// if any of the constraint columns are not included in the projection.
64    pub fn project(&self, proj_indices: &[usize]) -> Option<Self> {
65        let projected = self
66            .inner
67            .iter()
68            .filter_map(|constraint| {
69                match constraint {
70                    Constraint::PrimaryKey(indices) => {
71                        let new_indices =
72                            update_elements_with_matching_indices(indices, proj_indices);
73                        // Only keep the constraint if all columns are preserved:
74                        (new_indices.len() == indices.len())
75                            .then_some(Constraint::PrimaryKey(new_indices))
76                    }
77                    Constraint::Unique(indices) => {
78                        let new_indices =
79                            update_elements_with_matching_indices(indices, proj_indices);
80                        // Only keep the constraint if all columns are preserved:
81                        (new_indices.len() == indices.len())
82                            .then_some(Constraint::Unique(new_indices))
83                    }
84                }
85            })
86            .collect::<Vec<_>>();
87
88        (!projected.is_empty()).then_some(Constraints::new_unverified(projected))
89    }
90}
91
92impl IntoIterator for Constraints {
93    type Item = Constraint;
94    type IntoIter = IntoIter<Self::Item>;
95
96    fn into_iter(self) -> Self::IntoIter {
97        self.inner.into_iter()
98    }
99}
100
101impl Display for Constraints {
102    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103        let pk = self
104            .inner
105            .iter()
106            .map(|c| format!("{c:?}"))
107            .collect::<Vec<_>>();
108        let pk = pk.join(", ");
109        write!(f, "constraints=[{pk}]")
110    }
111}
112
113impl Deref for Constraints {
114    type Target = [Constraint];
115
116    fn deref(&self) -> &Self::Target {
117        self.inner.as_slice()
118    }
119}
120
121/// This object defines a functional dependence in the schema. A functional
122/// dependence defines a relationship between determinant keys and dependent
123/// columns. A determinant key is a column, or a set of columns, whose value
124/// uniquely determines values of some other (dependent) columns. If two rows
125/// have the same determinant key, dependent columns in these rows are
126/// necessarily the same. If the determinant key is unique, the set of
127/// dependent columns is equal to the entire schema and the determinant key can
128/// serve as a primary key. Note that a primary key may "downgrade" into a
129/// determinant key due to an operation such as a join, and this object is
130/// used to track dependence relationships in such cases. For more information
131/// on functional dependencies, see:
132/// <https://www.scaler.com/topics/dbms/functional-dependency-in-dbms/>
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct FunctionalDependence {
135    // Column indices of the (possibly composite) determinant key:
136    pub source_indices: Vec<usize>,
137    // Column indices of dependent column(s):
138    pub target_indices: Vec<usize>,
139    /// Flag indicating whether one of the `source_indices` can receive NULL values.
140    /// For a data source, if the constraint in question is `Constraint::Unique`,
141    /// this flag is `true`. If the constraint in question is `Constraint::PrimaryKey`,
142    /// this flag is `false`.
143    /// Note that as the schema changes between different stages in a plan,
144    /// such as after LEFT JOIN or RIGHT JOIN operations, this property may
145    /// change.
146    pub nullable: bool,
147    // The functional dependency mode:
148    pub mode: Dependency,
149}
150
151/// Describes functional dependency mode.
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum Dependency {
154    Single, // A determinant key may occur only once.
155    Multi,  // A determinant key may occur multiple times (in multiple rows).
156}
157
158impl FunctionalDependence {
159    // Creates a new functional dependence.
160    pub fn new(
161        source_indices: Vec<usize>,
162        target_indices: Vec<usize>,
163        nullable: bool,
164    ) -> Self {
165        Self {
166            source_indices,
167            target_indices,
168            nullable,
169            // Start with the least restrictive mode by default:
170            mode: Dependency::Multi,
171        }
172    }
173
174    pub fn with_mode(mut self, mode: Dependency) -> Self {
175        self.mode = mode;
176        self
177    }
178}
179
180/// This object encapsulates all functional dependencies in a given relation.
181#[derive(Debug, Clone, PartialEq, Eq)]
182pub struct FunctionalDependencies {
183    deps: Vec<FunctionalDependence>,
184}
185
186impl FunctionalDependencies {
187    /// Creates an empty `FunctionalDependencies` object.
188    pub fn empty() -> Self {
189        Self { deps: vec![] }
190    }
191
192    /// Creates a new `FunctionalDependencies` object from a vector of
193    /// `FunctionalDependence` objects.
194    pub fn new(dependencies: Vec<FunctionalDependence>) -> Self {
195        Self { deps: dependencies }
196    }
197
198    /// Creates a new `FunctionalDependencies` object from the given constraints.
199    pub fn new_from_constraints(
200        constraints: Option<&Constraints>,
201        n_field: usize,
202    ) -> Self {
203        if let Some(Constraints { inner: constraints }) = constraints {
204            // Construct dependency objects based on each individual constraint:
205            let dependencies = constraints
206                .iter()
207                .map(|constraint| {
208                    // All the field indices are associated with the whole table
209                    // since we are dealing with table level constraints:
210                    let dependency = match constraint {
211                        Constraint::PrimaryKey(indices) => FunctionalDependence::new(
212                            indices.to_vec(),
213                            (0..n_field).collect::<Vec<_>>(),
214                            false,
215                        ),
216                        Constraint::Unique(indices) => FunctionalDependence::new(
217                            indices.to_vec(),
218                            (0..n_field).collect::<Vec<_>>(),
219                            true,
220                        ),
221                    };
222                    // As primary keys are guaranteed to be unique, set the
223                    // functional dependency mode to `Dependency::Single`:
224                    dependency.with_mode(Dependency::Single)
225                })
226                .collect::<Vec<_>>();
227            Self::new(dependencies)
228        } else {
229            // There is no constraint, return an empty object:
230            Self::empty()
231        }
232    }
233
234    pub fn with_dependency(mut self, mode: Dependency) -> Self {
235        self.deps.iter_mut().for_each(|item| item.mode = mode);
236        self
237    }
238
239    /// Merges the given functional dependencies with these.
240    pub fn extend(&mut self, other: FunctionalDependencies) {
241        self.deps.extend(other.deps);
242    }
243
244    /// Sanity checks if functional dependencies are valid. For example, if
245    /// there are 10 fields, we cannot receive any index further than 9.
246    pub fn is_valid(&self, n_field: usize) -> bool {
247        self.deps.iter().all(
248            |FunctionalDependence {
249                 source_indices,
250                 target_indices,
251                 ..
252             }| {
253                source_indices
254                    .iter()
255                    .max()
256                    .map(|&max_index| max_index < n_field)
257                    .unwrap_or(true)
258                    && target_indices
259                        .iter()
260                        .max()
261                        .map(|&max_index| max_index < n_field)
262                        .unwrap_or(true)
263            },
264        )
265    }
266
267    /// Adds the `offset` value to `source_indices` and `target_indices` for
268    /// each functional dependency.
269    pub fn add_offset(&mut self, offset: usize) {
270        self.deps.iter_mut().for_each(
271            |FunctionalDependence {
272                 source_indices,
273                 target_indices,
274                 ..
275             }| {
276                *source_indices = add_offset_to_vec(source_indices, offset);
277                *target_indices = add_offset_to_vec(target_indices, offset);
278            },
279        )
280    }
281
282    /// Updates `source_indices` and `target_indices` of each functional
283    /// dependence using the index mapping given in `proj_indices`.
284    ///
285    /// Assume that `proj_indices` is \[2, 5, 8\] and we have a functional
286    /// dependence \[5\] (`source_indices`) -> \[5, 8\] (`target_indices`).
287    /// In the updated schema, fields at indices \[2, 5, 8\] will transform
288    /// to \[0, 1, 2\]. Therefore, the resulting functional dependence will
289    /// be \[1\] -> \[1, 2\].
290    pub fn project_functional_dependencies(
291        &self,
292        proj_indices: &[usize],
293        // The argument `n_out` denotes the schema field length, which is needed
294        // to correctly associate a `Single`-mode dependence with the whole table.
295        n_out: usize,
296    ) -> FunctionalDependencies {
297        let mut projected_func_dependencies = vec![];
298        for FunctionalDependence {
299            source_indices,
300            target_indices,
301            nullable,
302            mode,
303        } in &self.deps
304        {
305            let new_source_indices =
306                update_elements_with_matching_indices(source_indices, proj_indices);
307            let new_target_indices = if *mode == Dependency::Single {
308                // Associate with all of the fields in the schema:
309                (0..n_out).collect()
310            } else {
311                // Update associations according to projection:
312                update_elements_with_matching_indices(target_indices, proj_indices)
313            };
314            // All of the composite indices should still be valid after projection;
315            // otherwise, functional dependency cannot be propagated.
316            if new_source_indices.len() == source_indices.len() {
317                let new_func_dependence = FunctionalDependence::new(
318                    new_source_indices,
319                    new_target_indices,
320                    *nullable,
321                )
322                .with_mode(*mode);
323                projected_func_dependencies.push(new_func_dependence);
324            }
325        }
326        FunctionalDependencies::new(projected_func_dependencies)
327    }
328
329    /// This function joins this set of functional dependencies with the `other`
330    /// according to the given `join_type`.
331    pub fn join(
332        &self,
333        other: &FunctionalDependencies,
334        join_type: &JoinType,
335        left_cols_len: usize,
336    ) -> FunctionalDependencies {
337        // Get mutable copies of left and right side dependencies:
338        let mut right_func_dependencies = other.clone();
339        let mut left_func_dependencies = self.clone();
340
341        match join_type {
342            JoinType::Inner | JoinType::Left | JoinType::Right => {
343                // Add offset to right schema:
344                right_func_dependencies.add_offset(left_cols_len);
345
346                // Result may have multiple values, update the dependency mode:
347                left_func_dependencies =
348                    left_func_dependencies.with_dependency(Dependency::Multi);
349                right_func_dependencies =
350                    right_func_dependencies.with_dependency(Dependency::Multi);
351
352                if *join_type == JoinType::Left {
353                    // Downgrade the right side, since it may have additional NULL values:
354                    right_func_dependencies.downgrade_dependencies();
355                } else if *join_type == JoinType::Right {
356                    // Downgrade the left side, since it may have additional NULL values:
357                    left_func_dependencies.downgrade_dependencies();
358                }
359                // Combine left and right functional dependencies:
360                left_func_dependencies.extend(right_func_dependencies);
361                left_func_dependencies
362            }
363            JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
364                // These joins preserve functional dependencies of the left side:
365                left_func_dependencies
366            }
367            JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
368                // These joins preserve functional dependencies of the right side:
369                right_func_dependencies
370            }
371            JoinType::Full => {
372                // All of the functional dependencies are lost in a FULL join:
373                FunctionalDependencies::empty()
374            }
375        }
376    }
377
378    /// This function downgrades a functional dependency when nullability becomes
379    /// a possibility:
380    /// - If the dependency in question is UNIQUE (i.e. nullable), a new null value
381    ///   invalidates the dependency.
382    /// - If the dependency in question is PRIMARY KEY (i.e. not nullable), a new
383    ///   null value turns it into UNIQUE mode.
384    fn downgrade_dependencies(&mut self) {
385        // Delete nullable dependencies, since they are no longer valid:
386        self.deps.retain(|item| !item.nullable);
387        self.deps.iter_mut().for_each(|item| item.nullable = true);
388    }
389
390    /// This function ensures that functional dependencies involving uniquely
391    /// occurring determinant keys cover their entire table in terms of
392    /// dependent columns.
393    pub fn extend_target_indices(&mut self, n_out: usize) {
394        self.deps.iter_mut().for_each(
395            |FunctionalDependence {
396                 mode,
397                 target_indices,
398                 ..
399             }| {
400                // If unique, cover the whole table:
401                if *mode == Dependency::Single {
402                    *target_indices = (0..n_out).collect::<Vec<_>>();
403                }
404            },
405        )
406    }
407}
408
409impl Deref for FunctionalDependencies {
410    type Target = [FunctionalDependence];
411
412    fn deref(&self) -> &Self::Target {
413        self.deps.as_slice()
414    }
415}
416
417/// Calculates functional dependencies for aggregate output, when there is a GROUP BY expression.
418pub fn aggregate_functional_dependencies(
419    aggr_input_schema: &DFSchema,
420    group_by_expr_names: &[String],
421    aggr_schema: &DFSchema,
422) -> FunctionalDependencies {
423    let mut aggregate_func_dependencies = vec![];
424    let aggr_input_fields = aggr_input_schema.field_names();
425    let aggr_fields = aggr_schema.fields();
426    // Association covers the whole table:
427    let target_indices = (0..aggr_schema.fields().len()).collect::<Vec<_>>();
428    // Get functional dependencies of the schema:
429    let func_dependencies = aggr_input_schema.functional_dependencies();
430    for FunctionalDependence {
431        source_indices,
432        nullable,
433        mode,
434        ..
435    } in &func_dependencies.deps
436    {
437        // Keep source indices in a `HashSet` to prevent duplicate entries:
438        let mut new_source_indices = vec![];
439        let mut new_source_field_names = vec![];
440        let source_field_names = source_indices
441            .iter()
442            .map(|&idx| &aggr_input_fields[idx])
443            .collect::<Vec<_>>();
444
445        for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() {
446            // When one of the input determinant expressions matches with
447            // the GROUP BY expression, add the index of the GROUP BY
448            // expression as a new determinant key:
449            if source_field_names.contains(&group_by_expr_name) {
450                new_source_indices.push(idx);
451                new_source_field_names.push(group_by_expr_name.clone());
452            }
453        }
454        let existing_target_indices =
455            get_target_functional_dependencies(aggr_input_schema, group_by_expr_names);
456        let new_target_indices = get_target_functional_dependencies(
457            aggr_input_schema,
458            &new_source_field_names,
459        );
460        let mode = if existing_target_indices == new_target_indices
461            && new_target_indices.is_some()
462        {
463            // If dependency covers all GROUP BY expressions, mode will be `Single`:
464            Dependency::Single
465        } else {
466            // Otherwise, existing mode is preserved:
467            *mode
468        };
469        // All of the composite indices occur in the GROUP BY expression:
470        if new_source_indices.len() == source_indices.len() {
471            aggregate_func_dependencies.push(
472                FunctionalDependence::new(
473                    new_source_indices,
474                    target_indices.clone(),
475                    *nullable,
476                )
477                .with_mode(mode),
478            );
479        }
480    }
481
482    // When we have a GROUP BY key, we can guarantee uniqueness after
483    // aggregation:
484    if !group_by_expr_names.is_empty() {
485        let count = group_by_expr_names.len();
486        let source_indices = (0..count).collect::<Vec<_>>();
487        let nullable = source_indices
488            .iter()
489            .any(|idx| aggr_fields[*idx].is_nullable());
490        // If GROUP BY expressions do not already act as a determinant:
491        if !aggregate_func_dependencies.iter().any(|item| {
492            // If `item.source_indices` is a subset of GROUP BY expressions, we shouldn't add
493            // them since `item.source_indices` defines this relation already.
494
495            // The following simple comparison is working well because
496            // GROUP BY expressions come here as a prefix.
497            item.source_indices.iter().all(|idx| idx < &count)
498        }) {
499            // Add a new functional dependency associated with the whole table:
500            // Use nullable property of the GROUP BY expression:
501            aggregate_func_dependencies.push(
502                // Use nullable property of the GROUP BY expression:
503                FunctionalDependence::new(source_indices, target_indices, nullable)
504                    .with_mode(Dependency::Single),
505            );
506        }
507    }
508    FunctionalDependencies::new(aggregate_func_dependencies)
509}
510
511/// Returns target indices, for the determinant keys that are inside
512/// group by expressions.
513pub fn get_target_functional_dependencies(
514    schema: &DFSchema,
515    group_by_expr_names: &[String],
516) -> Option<Vec<usize>> {
517    let mut combined_target_indices = HashSet::new();
518    let dependencies = schema.functional_dependencies();
519    let field_names = schema.field_names();
520    for FunctionalDependence {
521        source_indices,
522        target_indices,
523        ..
524    } in &dependencies.deps
525    {
526        let source_key_names = source_indices
527            .iter()
528            .map(|id_key_idx| &field_names[*id_key_idx])
529            .collect::<Vec<_>>();
530        // If the GROUP BY expression contains a determinant key, we can use
531        // the associated fields after aggregation even if they are not part
532        // of the GROUP BY expression.
533        if source_key_names
534            .iter()
535            .all(|source_key_name| group_by_expr_names.contains(source_key_name))
536        {
537            combined_target_indices.extend(target_indices.iter());
538        }
539    }
540    (!combined_target_indices.is_empty()).then_some({
541        let mut result = combined_target_indices.into_iter().collect::<Vec<_>>();
542        result.sort();
543        result
544    })
545}
546
547/// Returns indices for the minimal subset of GROUP BY expressions that are
548/// functionally equivalent to the original set of GROUP BY expressions.
549pub fn get_required_group_by_exprs_indices(
550    schema: &DFSchema,
551    group_by_expr_names: &[String],
552) -> Option<Vec<usize>> {
553    let dependencies = schema.functional_dependencies();
554    let field_names = schema.field_names();
555    let mut groupby_expr_indices = group_by_expr_names
556        .iter()
557        .map(|group_by_expr_name| {
558            field_names
559                .iter()
560                .position(|field_name| field_name == group_by_expr_name)
561        })
562        .collect::<Option<Vec<_>>>()?;
563
564    groupby_expr_indices.sort();
565    for FunctionalDependence {
566        source_indices,
567        target_indices,
568        ..
569    } in &dependencies.deps
570    {
571        if source_indices
572            .iter()
573            .all(|source_idx| groupby_expr_indices.contains(source_idx))
574        {
575            // If all source indices are among GROUP BY expression indices, we
576            // can remove target indices from GROUP BY expression indices and
577            // use source indices instead.
578            groupby_expr_indices = set_difference(&groupby_expr_indices, target_indices);
579            groupby_expr_indices =
580                merge_and_order_indices(groupby_expr_indices, source_indices);
581        }
582    }
583    groupby_expr_indices
584        .iter()
585        .map(|idx| {
586            group_by_expr_names
587                .iter()
588                .position(|name| &field_names[*idx] == name)
589        })
590        .collect()
591}
592
593/// Updates entries inside the `entries` vector with their corresponding
594/// indices inside the `proj_indices` vector.
595fn update_elements_with_matching_indices(
596    entries: &[usize],
597    proj_indices: &[usize],
598) -> Vec<usize> {
599    entries
600        .iter()
601        .filter_map(|val| proj_indices.iter().position(|proj_idx| proj_idx == val))
602        .collect()
603}
604
605/// Adds `offset` value to each entry inside `in_data`.
606fn add_offset_to_vec<T: Copy + std::ops::Add<Output = T>>(
607    in_data: &[T],
608    offset: T,
609) -> Vec<T> {
610    in_data.iter().map(|&item| item + offset).collect()
611}
612
613#[cfg(test)]
614mod tests {
615    use super::*;
616
617    #[test]
618    fn constraints_iter() {
619        let constraints = Constraints::new_unverified(vec![
620            Constraint::PrimaryKey(vec![10]),
621            Constraint::Unique(vec![20]),
622        ]);
623        let mut iter = constraints.iter();
624        assert_eq!(iter.next(), Some(&Constraint::PrimaryKey(vec![10])));
625        assert_eq!(iter.next(), Some(&Constraint::Unique(vec![20])));
626        assert_eq!(iter.next(), None);
627    }
628
629    #[test]
630    fn test_project_constraints() {
631        let constraints = Constraints::new_unverified(vec![
632            Constraint::PrimaryKey(vec![1, 2]),
633            Constraint::Unique(vec![0, 3]),
634        ]);
635
636        // Project keeping columns 1,2,3
637        let projected = constraints.project(&[1, 2, 3]).unwrap();
638        assert_eq!(
639            projected,
640            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0, 1])])
641        );
642
643        // Project keeping only column 0 - should return None as no constraints are preserved
644        assert!(constraints.project(&[0]).is_none());
645    }
646
647    #[test]
648    fn test_get_updated_id_keys() {
649        let fund_dependencies =
650            FunctionalDependencies::new(vec![FunctionalDependence::new(
651                vec![1],
652                vec![0, 1, 2],
653                true,
654            )]);
655        let res = fund_dependencies.project_functional_dependencies(&[1, 2], 2);
656        let expected = FunctionalDependencies::new(vec![FunctionalDependence::new(
657            vec![0],
658            vec![0, 1],
659            true,
660        )]);
661        assert_eq!(res, expected);
662    }
663}