datafusion_common/functional_dependencies.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! FunctionalDependencies keeps track of functional dependencies
19//! inside DFSchema.
20
21use std::fmt::{Display, Formatter};
22use std::ops::Deref;
23use std::vec::IntoIter;
24
25use crate::utils::{merge_and_order_indices, set_difference};
26use crate::{DFSchema, HashSet, JoinType};
27
28/// This object defines a constraint on a table.
29#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
30pub enum Constraint {
31 /// Columns with the given indices form a composite primary key (they are
32 /// jointly unique and not nullable):
33 PrimaryKey(Vec<usize>),
34 /// Columns with the given indices form a composite unique key:
35 Unique(Vec<usize>),
36}
37
38/// This object encapsulates a list of functional constraints:
39#[derive(Clone, Debug, Default, Eq, Hash, PartialEq, PartialOrd)]
40pub struct Constraints {
41 inner: Vec<Constraint>,
42}
43
44impl Constraints {
45 /// Create a new [`Constraints`] object from the given `constraints`.
46 /// Users should use the [`Constraints::default`] or [`SqlToRel::new_constraint_from_table_constraints`]
47 /// functions for constructing [`Constraints`] instances. This constructor
48 /// is for internal purposes only and does not check whether the argument
49 /// is valid. The user is responsible for supplying a valid vector of
50 /// [`Constraint`] objects.
51 ///
52 /// [`SqlToRel::new_constraint_from_table_constraints`]: https://docs.rs/datafusion/latest/datafusion/sql/planner/struct.SqlToRel.html#method.new_constraint_from_table_constraints
53 pub fn new_unverified(constraints: Vec<Constraint>) -> Self {
54 Self { inner: constraints }
55 }
56
57 /// Extends the current constraints with the given `other` constraints.
58 pub fn extend(&mut self, other: Constraints) {
59 self.inner.extend(other.inner);
60 }
61
62 /// Projects constraints using the given projection indices. Returns `None`
63 /// if any of the constraint columns are not included in the projection.
64 pub fn project(&self, proj_indices: &[usize]) -> Option<Self> {
65 let projected = self
66 .inner
67 .iter()
68 .filter_map(|constraint| {
69 match constraint {
70 Constraint::PrimaryKey(indices) => {
71 let new_indices =
72 update_elements_with_matching_indices(indices, proj_indices);
73 // Only keep the constraint if all columns are preserved:
74 (new_indices.len() == indices.len())
75 .then_some(Constraint::PrimaryKey(new_indices))
76 }
77 Constraint::Unique(indices) => {
78 let new_indices =
79 update_elements_with_matching_indices(indices, proj_indices);
80 // Only keep the constraint if all columns are preserved:
81 (new_indices.len() == indices.len())
82 .then_some(Constraint::Unique(new_indices))
83 }
84 }
85 })
86 .collect::<Vec<_>>();
87
88 (!projected.is_empty()).then_some(Constraints::new_unverified(projected))
89 }
90}
91
92impl IntoIterator for Constraints {
93 type Item = Constraint;
94 type IntoIter = IntoIter<Self::Item>;
95
96 fn into_iter(self) -> Self::IntoIter {
97 self.inner.into_iter()
98 }
99}
100
101impl Display for Constraints {
102 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103 let pk = self
104 .inner
105 .iter()
106 .map(|c| format!("{c:?}"))
107 .collect::<Vec<_>>();
108 let pk = pk.join(", ");
109 write!(f, "constraints=[{pk}]")
110 }
111}
112
113impl Deref for Constraints {
114 type Target = [Constraint];
115
116 fn deref(&self) -> &Self::Target {
117 self.inner.as_slice()
118 }
119}
120
121/// This object defines a functional dependence in the schema. A functional
122/// dependence defines a relationship between determinant keys and dependent
123/// columns. A determinant key is a column, or a set of columns, whose value
124/// uniquely determines values of some other (dependent) columns. If two rows
125/// have the same determinant key, dependent columns in these rows are
126/// necessarily the same. If the determinant key is unique, the set of
127/// dependent columns is equal to the entire schema and the determinant key can
128/// serve as a primary key. Note that a primary key may "downgrade" into a
129/// determinant key due to an operation such as a join, and this object is
130/// used to track dependence relationships in such cases. For more information
131/// on functional dependencies, see:
132/// <https://www.scaler.com/topics/dbms/functional-dependency-in-dbms/>
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct FunctionalDependence {
135 // Column indices of the (possibly composite) determinant key:
136 pub source_indices: Vec<usize>,
137 // Column indices of dependent column(s):
138 pub target_indices: Vec<usize>,
139 /// Flag indicating whether one of the `source_indices` can receive NULL values.
140 /// For a data source, if the constraint in question is `Constraint::Unique`,
141 /// this flag is `true`. If the constraint in question is `Constraint::PrimaryKey`,
142 /// this flag is `false`.
143 /// Note that as the schema changes between different stages in a plan,
144 /// such as after LEFT JOIN or RIGHT JOIN operations, this property may
145 /// change.
146 pub nullable: bool,
147 // The functional dependency mode:
148 pub mode: Dependency,
149}
150
151/// Describes functional dependency mode.
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum Dependency {
154 Single, // A determinant key may occur only once.
155 Multi, // A determinant key may occur multiple times (in multiple rows).
156}
157
158impl FunctionalDependence {
159 // Creates a new functional dependence.
160 pub fn new(
161 source_indices: Vec<usize>,
162 target_indices: Vec<usize>,
163 nullable: bool,
164 ) -> Self {
165 Self {
166 source_indices,
167 target_indices,
168 nullable,
169 // Start with the least restrictive mode by default:
170 mode: Dependency::Multi,
171 }
172 }
173
174 pub fn with_mode(mut self, mode: Dependency) -> Self {
175 self.mode = mode;
176 self
177 }
178}
179
180/// This object encapsulates all functional dependencies in a given relation.
181#[derive(Debug, Clone, PartialEq, Eq)]
182pub struct FunctionalDependencies {
183 deps: Vec<FunctionalDependence>,
184}
185
186impl FunctionalDependencies {
187 /// Creates an empty `FunctionalDependencies` object.
188 pub fn empty() -> Self {
189 Self { deps: vec![] }
190 }
191
192 /// Creates a new `FunctionalDependencies` object from a vector of
193 /// `FunctionalDependence` objects.
194 pub fn new(dependencies: Vec<FunctionalDependence>) -> Self {
195 Self { deps: dependencies }
196 }
197
198 /// Creates a new `FunctionalDependencies` object from the given constraints.
199 pub fn new_from_constraints(
200 constraints: Option<&Constraints>,
201 n_field: usize,
202 ) -> Self {
203 if let Some(Constraints { inner: constraints }) = constraints {
204 // Construct dependency objects based on each individual constraint:
205 let dependencies = constraints
206 .iter()
207 .map(|constraint| {
208 // All the field indices are associated with the whole table
209 // since we are dealing with table level constraints:
210 let dependency = match constraint {
211 Constraint::PrimaryKey(indices) => FunctionalDependence::new(
212 indices.to_vec(),
213 (0..n_field).collect::<Vec<_>>(),
214 false,
215 ),
216 Constraint::Unique(indices) => FunctionalDependence::new(
217 indices.to_vec(),
218 (0..n_field).collect::<Vec<_>>(),
219 true,
220 ),
221 };
222 // As primary keys are guaranteed to be unique, set the
223 // functional dependency mode to `Dependency::Single`:
224 dependency.with_mode(Dependency::Single)
225 })
226 .collect::<Vec<_>>();
227 Self::new(dependencies)
228 } else {
229 // There is no constraint, return an empty object:
230 Self::empty()
231 }
232 }
233
234 pub fn with_dependency(mut self, mode: Dependency) -> Self {
235 self.deps.iter_mut().for_each(|item| item.mode = mode);
236 self
237 }
238
239 /// Merges the given functional dependencies with these.
240 pub fn extend(&mut self, other: FunctionalDependencies) {
241 self.deps.extend(other.deps);
242 }
243
244 /// Sanity checks if functional dependencies are valid. For example, if
245 /// there are 10 fields, we cannot receive any index further than 9.
246 pub fn is_valid(&self, n_field: usize) -> bool {
247 self.deps.iter().all(
248 |FunctionalDependence {
249 source_indices,
250 target_indices,
251 ..
252 }| {
253 source_indices
254 .iter()
255 .max()
256 .map(|&max_index| max_index < n_field)
257 .unwrap_or(true)
258 && target_indices
259 .iter()
260 .max()
261 .map(|&max_index| max_index < n_field)
262 .unwrap_or(true)
263 },
264 )
265 }
266
267 /// Adds the `offset` value to `source_indices` and `target_indices` for
268 /// each functional dependency.
269 pub fn add_offset(&mut self, offset: usize) {
270 self.deps.iter_mut().for_each(
271 |FunctionalDependence {
272 source_indices,
273 target_indices,
274 ..
275 }| {
276 *source_indices = add_offset_to_vec(source_indices, offset);
277 *target_indices = add_offset_to_vec(target_indices, offset);
278 },
279 )
280 }
281
282 /// Updates `source_indices` and `target_indices` of each functional
283 /// dependence using the index mapping given in `proj_indices`.
284 ///
285 /// Assume that `proj_indices` is \[2, 5, 8\] and we have a functional
286 /// dependence \[5\] (`source_indices`) -> \[5, 8\] (`target_indices`).
287 /// In the updated schema, fields at indices \[2, 5, 8\] will transform
288 /// to \[0, 1, 2\]. Therefore, the resulting functional dependence will
289 /// be \[1\] -> \[1, 2\].
290 pub fn project_functional_dependencies(
291 &self,
292 proj_indices: &[usize],
293 // The argument `n_out` denotes the schema field length, which is needed
294 // to correctly associate a `Single`-mode dependence with the whole table.
295 n_out: usize,
296 ) -> FunctionalDependencies {
297 let mut projected_func_dependencies = vec![];
298 for FunctionalDependence {
299 source_indices,
300 target_indices,
301 nullable,
302 mode,
303 } in &self.deps
304 {
305 let new_source_indices =
306 update_elements_with_matching_indices(source_indices, proj_indices);
307 let new_target_indices = if *mode == Dependency::Single {
308 // Associate with all of the fields in the schema:
309 (0..n_out).collect()
310 } else {
311 // Update associations according to projection:
312 update_elements_with_matching_indices(target_indices, proj_indices)
313 };
314 // All of the composite indices should still be valid after projection;
315 // otherwise, functional dependency cannot be propagated.
316 if new_source_indices.len() == source_indices.len() {
317 let new_func_dependence = FunctionalDependence::new(
318 new_source_indices,
319 new_target_indices,
320 *nullable,
321 )
322 .with_mode(*mode);
323 projected_func_dependencies.push(new_func_dependence);
324 }
325 }
326 FunctionalDependencies::new(projected_func_dependencies)
327 }
328
329 /// This function joins this set of functional dependencies with the `other`
330 /// according to the given `join_type`.
331 pub fn join(
332 &self,
333 other: &FunctionalDependencies,
334 join_type: &JoinType,
335 left_cols_len: usize,
336 ) -> FunctionalDependencies {
337 // Get mutable copies of left and right side dependencies:
338 let mut right_func_dependencies = other.clone();
339 let mut left_func_dependencies = self.clone();
340
341 match join_type {
342 JoinType::Inner | JoinType::Left | JoinType::Right => {
343 // Add offset to right schema:
344 right_func_dependencies.add_offset(left_cols_len);
345
346 // Result may have multiple values, update the dependency mode:
347 left_func_dependencies =
348 left_func_dependencies.with_dependency(Dependency::Multi);
349 right_func_dependencies =
350 right_func_dependencies.with_dependency(Dependency::Multi);
351
352 if *join_type == JoinType::Left {
353 // Downgrade the right side, since it may have additional NULL values:
354 right_func_dependencies.downgrade_dependencies();
355 } else if *join_type == JoinType::Right {
356 // Downgrade the left side, since it may have additional NULL values:
357 left_func_dependencies.downgrade_dependencies();
358 }
359 // Combine left and right functional dependencies:
360 left_func_dependencies.extend(right_func_dependencies);
361 left_func_dependencies
362 }
363 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
364 // These joins preserve functional dependencies of the left side:
365 left_func_dependencies
366 }
367 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
368 // These joins preserve functional dependencies of the right side:
369 right_func_dependencies
370 }
371 JoinType::Full => {
372 // All of the functional dependencies are lost in a FULL join:
373 FunctionalDependencies::empty()
374 }
375 }
376 }
377
378 /// This function downgrades a functional dependency when nullability becomes
379 /// a possibility:
380 /// - If the dependency in question is UNIQUE (i.e. nullable), a new null value
381 /// invalidates the dependency.
382 /// - If the dependency in question is PRIMARY KEY (i.e. not nullable), a new
383 /// null value turns it into UNIQUE mode.
384 fn downgrade_dependencies(&mut self) {
385 // Delete nullable dependencies, since they are no longer valid:
386 self.deps.retain(|item| !item.nullable);
387 self.deps.iter_mut().for_each(|item| item.nullable = true);
388 }
389
390 /// This function ensures that functional dependencies involving uniquely
391 /// occurring determinant keys cover their entire table in terms of
392 /// dependent columns.
393 pub fn extend_target_indices(&mut self, n_out: usize) {
394 self.deps.iter_mut().for_each(
395 |FunctionalDependence {
396 mode,
397 target_indices,
398 ..
399 }| {
400 // If unique, cover the whole table:
401 if *mode == Dependency::Single {
402 *target_indices = (0..n_out).collect::<Vec<_>>();
403 }
404 },
405 )
406 }
407}
408
409impl Deref for FunctionalDependencies {
410 type Target = [FunctionalDependence];
411
412 fn deref(&self) -> &Self::Target {
413 self.deps.as_slice()
414 }
415}
416
417/// Calculates functional dependencies for aggregate output, when there is a GROUP BY expression.
418pub fn aggregate_functional_dependencies(
419 aggr_input_schema: &DFSchema,
420 group_by_expr_names: &[String],
421 aggr_schema: &DFSchema,
422) -> FunctionalDependencies {
423 let mut aggregate_func_dependencies = vec![];
424 let aggr_input_fields = aggr_input_schema.field_names();
425 let aggr_fields = aggr_schema.fields();
426 // Association covers the whole table:
427 let target_indices = (0..aggr_schema.fields().len()).collect::<Vec<_>>();
428 // Get functional dependencies of the schema:
429 let func_dependencies = aggr_input_schema.functional_dependencies();
430 for FunctionalDependence {
431 source_indices,
432 nullable,
433 mode,
434 ..
435 } in &func_dependencies.deps
436 {
437 // Keep source indices in a `HashSet` to prevent duplicate entries:
438 let mut new_source_indices = vec![];
439 let mut new_source_field_names = vec![];
440 let source_field_names = source_indices
441 .iter()
442 .map(|&idx| &aggr_input_fields[idx])
443 .collect::<Vec<_>>();
444
445 for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() {
446 // When one of the input determinant expressions matches with
447 // the GROUP BY expression, add the index of the GROUP BY
448 // expression as a new determinant key:
449 if source_field_names.contains(&group_by_expr_name) {
450 new_source_indices.push(idx);
451 new_source_field_names.push(group_by_expr_name.clone());
452 }
453 }
454 let existing_target_indices =
455 get_target_functional_dependencies(aggr_input_schema, group_by_expr_names);
456 let new_target_indices = get_target_functional_dependencies(
457 aggr_input_schema,
458 &new_source_field_names,
459 );
460 let mode = if existing_target_indices == new_target_indices
461 && new_target_indices.is_some()
462 {
463 // If dependency covers all GROUP BY expressions, mode will be `Single`:
464 Dependency::Single
465 } else {
466 // Otherwise, existing mode is preserved:
467 *mode
468 };
469 // All of the composite indices occur in the GROUP BY expression:
470 if new_source_indices.len() == source_indices.len() {
471 aggregate_func_dependencies.push(
472 FunctionalDependence::new(
473 new_source_indices,
474 target_indices.clone(),
475 *nullable,
476 )
477 .with_mode(mode),
478 );
479 }
480 }
481
482 // When we have a GROUP BY key, we can guarantee uniqueness after
483 // aggregation:
484 if !group_by_expr_names.is_empty() {
485 let count = group_by_expr_names.len();
486 let source_indices = (0..count).collect::<Vec<_>>();
487 let nullable = source_indices
488 .iter()
489 .any(|idx| aggr_fields[*idx].is_nullable());
490 // If GROUP BY expressions do not already act as a determinant:
491 if !aggregate_func_dependencies.iter().any(|item| {
492 // If `item.source_indices` is a subset of GROUP BY expressions, we shouldn't add
493 // them since `item.source_indices` defines this relation already.
494
495 // The following simple comparison is working well because
496 // GROUP BY expressions come here as a prefix.
497 item.source_indices.iter().all(|idx| idx < &count)
498 }) {
499 // Add a new functional dependency associated with the whole table:
500 // Use nullable property of the GROUP BY expression:
501 aggregate_func_dependencies.push(
502 // Use nullable property of the GROUP BY expression:
503 FunctionalDependence::new(source_indices, target_indices, nullable)
504 .with_mode(Dependency::Single),
505 );
506 }
507 }
508 FunctionalDependencies::new(aggregate_func_dependencies)
509}
510
511/// Returns target indices, for the determinant keys that are inside
512/// group by expressions.
513pub fn get_target_functional_dependencies(
514 schema: &DFSchema,
515 group_by_expr_names: &[String],
516) -> Option<Vec<usize>> {
517 let mut combined_target_indices = HashSet::new();
518 let dependencies = schema.functional_dependencies();
519 let field_names = schema.field_names();
520 for FunctionalDependence {
521 source_indices,
522 target_indices,
523 ..
524 } in &dependencies.deps
525 {
526 let source_key_names = source_indices
527 .iter()
528 .map(|id_key_idx| &field_names[*id_key_idx])
529 .collect::<Vec<_>>();
530 // If the GROUP BY expression contains a determinant key, we can use
531 // the associated fields after aggregation even if they are not part
532 // of the GROUP BY expression.
533 if source_key_names
534 .iter()
535 .all(|source_key_name| group_by_expr_names.contains(source_key_name))
536 {
537 combined_target_indices.extend(target_indices.iter());
538 }
539 }
540 (!combined_target_indices.is_empty()).then_some({
541 let mut result = combined_target_indices.into_iter().collect::<Vec<_>>();
542 result.sort();
543 result
544 })
545}
546
547/// Returns indices for the minimal subset of GROUP BY expressions that are
548/// functionally equivalent to the original set of GROUP BY expressions.
549pub fn get_required_group_by_exprs_indices(
550 schema: &DFSchema,
551 group_by_expr_names: &[String],
552) -> Option<Vec<usize>> {
553 let dependencies = schema.functional_dependencies();
554 let field_names = schema.field_names();
555 let mut groupby_expr_indices = group_by_expr_names
556 .iter()
557 .map(|group_by_expr_name| {
558 field_names
559 .iter()
560 .position(|field_name| field_name == group_by_expr_name)
561 })
562 .collect::<Option<Vec<_>>>()?;
563
564 groupby_expr_indices.sort();
565 for FunctionalDependence {
566 source_indices,
567 target_indices,
568 ..
569 } in &dependencies.deps
570 {
571 if source_indices
572 .iter()
573 .all(|source_idx| groupby_expr_indices.contains(source_idx))
574 {
575 // If all source indices are among GROUP BY expression indices, we
576 // can remove target indices from GROUP BY expression indices and
577 // use source indices instead.
578 groupby_expr_indices = set_difference(&groupby_expr_indices, target_indices);
579 groupby_expr_indices =
580 merge_and_order_indices(groupby_expr_indices, source_indices);
581 }
582 }
583 groupby_expr_indices
584 .iter()
585 .map(|idx| {
586 group_by_expr_names
587 .iter()
588 .position(|name| &field_names[*idx] == name)
589 })
590 .collect()
591}
592
593/// Updates entries inside the `entries` vector with their corresponding
594/// indices inside the `proj_indices` vector.
595fn update_elements_with_matching_indices(
596 entries: &[usize],
597 proj_indices: &[usize],
598) -> Vec<usize> {
599 entries
600 .iter()
601 .filter_map(|val| proj_indices.iter().position(|proj_idx| proj_idx == val))
602 .collect()
603}
604
605/// Adds `offset` value to each entry inside `in_data`.
606fn add_offset_to_vec<T: Copy + std::ops::Add<Output = T>>(
607 in_data: &[T],
608 offset: T,
609) -> Vec<T> {
610 in_data.iter().map(|&item| item + offset).collect()
611}
612
613#[cfg(test)]
614mod tests {
615 use super::*;
616
617 #[test]
618 fn constraints_iter() {
619 let constraints = Constraints::new_unverified(vec![
620 Constraint::PrimaryKey(vec![10]),
621 Constraint::Unique(vec![20]),
622 ]);
623 let mut iter = constraints.iter();
624 assert_eq!(iter.next(), Some(&Constraint::PrimaryKey(vec![10])));
625 assert_eq!(iter.next(), Some(&Constraint::Unique(vec![20])));
626 assert_eq!(iter.next(), None);
627 }
628
629 #[test]
630 fn test_project_constraints() {
631 let constraints = Constraints::new_unverified(vec![
632 Constraint::PrimaryKey(vec![1, 2]),
633 Constraint::Unique(vec![0, 3]),
634 ]);
635
636 // Project keeping columns 1,2,3
637 let projected = constraints.project(&[1, 2, 3]).unwrap();
638 assert_eq!(
639 projected,
640 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0, 1])])
641 );
642
643 // Project keeping only column 0 - should return None as no constraints are preserved
644 assert!(constraints.project(&[0]).is_none());
645 }
646
647 #[test]
648 fn test_get_updated_id_keys() {
649 let fund_dependencies =
650 FunctionalDependencies::new(vec![FunctionalDependence::new(
651 vec![1],
652 vec![0, 1, 2],
653 true,
654 )]);
655 let res = fund_dependencies.project_functional_dependencies(&[1, 2], 2);
656 let expected = FunctionalDependencies::new(vec![FunctionalDependence::new(
657 vec![0],
658 vec![0, 1],
659 true,
660 )]);
661 assert_eq!(res, expected);
662 }
663}