1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// SPDX-License-Identifier: Apache-2.0

//! Module providing parse/validation functions for set relations.
//!
//! The set operation encompasses several set level operations that support
//! combining datasets based, possibly excluding records based on various
//! types of record level matching.
//!
//! See <https://substrait.io/relations/logical_relations/#set-operation>

use std::sync::Arc;

use crate::input::proto::substrait;
use crate::output::diagnostic;
use crate::parse::context;
use crate::parse::types;

enum Operation {
    Invalid,
    Subtract,
    SubtractByUnion,
    SubtractByIntersection,
    Intersect,
    IntersectWithUnion,
    Union,
    Merge,
}

/// Parse set relation.
pub fn parse_set_rel(x: &substrait::SetRel, y: &mut context::Context) -> diagnostic::Result<()> {
    use substrait::set_rel::SetOp;

    // Parse inputs.
    let in_types: Vec<_> = handle_rel_inputs!(x, y).collect();

    // Check inputs and derive schema.
    if in_types.len() < 2 {
        diagnostic!(
            y,
            Error,
            RelationMissing,
            "set operations require at least two input relations"
        );
    }
    let mut schema = Arc::default();
    for in_type in in_types.iter() {
        schema = types::assert_equal(
            y,
            &in_type.strip_field_names(),
            &schema,
            "all set inputs must have matching schemas",
        );
    }
    y.set_schema(schema);

    // Check set operation.
    let op = proto_required_enum_field!(x, y, op, SetOp)
        .1
        .unwrap_or_default();
    let op = match (op, in_types.len() > 2) {
        (SetOp::Unspecified, _) => Operation::Invalid,
        (SetOp::MinusPrimary, true) => Operation::SubtractByUnion,
        (SetOp::MinusPrimary, false) => Operation::Subtract,
        (SetOp::MinusMultiset, true) => Operation::SubtractByIntersection,
        (SetOp::MinusMultiset, false) => Operation::Subtract,
        (SetOp::IntersectionPrimary, true) => Operation::IntersectWithUnion,
        (SetOp::IntersectionPrimary, false) => Operation::Intersect,
        (SetOp::IntersectionMultiset, _) => Operation::Intersect,
        (SetOp::UnionDistinct, _) => Operation::Union,
        (SetOp::UnionAll, _) => Operation::Merge,
    };

    // Describe the relation.
    match op {
        Operation::Invalid => {
            describe!(y, Relation, "Invalid set operation");
        }
        Operation::Subtract => {
            describe!(y, Relation, "Set subtraction");
            summary!(
                y,
                "Yields all rows from the first dataset that do not exist \
                in the second dataset."
            );
        }
        Operation::SubtractByUnion => {
            describe!(y, Relation, "Set subtract by union");
            summary!(
                y,
                "Yields all rows from the first dataset that do not exist \
                in any of the other datasets."
            );
        }
        Operation::SubtractByIntersection => {
            describe!(y, Relation, "Set subtract by intersection");
            summary!(
                y,
                "Yields all rows from the first dataset that do not exist in \
                all of the other datasets."
            );
        }
        Operation::Intersect => {
            describe!(y, Relation, "Set intersection");
            summary!(
                y,
                "Yields all rows from the first dataset that exist in all \
                datasets."
            );
        }
        Operation::IntersectWithUnion => {
            describe!(y, Relation, "Set intersect with union");
            summary!(
                y,
                "Yields all rows from the first dataset that exist in any of \
                the other datasets."
            );
        }
        Operation::Union => {
            describe!(y, Relation, "Set union");
            summary!(
                y,
                "Yields all rows that exist in any dataset, removing duplicates."
            );
        }
        Operation::Merge => {
            describe!(y, Relation, "Merge");
            summary!(y, "Yields all rows from all incoming datasets.");
        }
    };

    // Handle the common field.
    handle_rel_common!(x, y);

    // Handle the advanced extension field.
    handle_advanced_extension!(x, y);

    Ok(())
}