1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use std::sync::Arc;
use crate::input::proto::substrait;
use crate::output::diagnostic;
use crate::parse::context;
use crate::parse::types;
enum Operation {
Invalid,
Subtract,
SubtractByUnion,
SubtractByIntersection,
Intersect,
IntersectWithUnion,
Union,
Merge,
}
pub fn parse_set_rel(x: &substrait::SetRel, y: &mut context::Context) -> diagnostic::Result<()> {
use substrait::set_rel::SetOp;
let in_types: Vec<_> = handle_rel_inputs!(x, y).collect();
if in_types.len() < 2 {
diagnostic!(
y,
Error,
RelationMissing,
"set operations require at least two input relations"
);
}
let mut schema = Arc::default();
for in_type in in_types.iter() {
schema = types::assert_equal(
y,
&in_type.strip_field_names(),
&schema,
"all set inputs must have matching schemas",
);
}
y.set_schema(schema);
let op = proto_required_enum_field!(x, y, op, SetOp)
.1
.unwrap_or_default();
let op = match (op, in_types.len() > 2) {
(SetOp::Unspecified, _) => Operation::Invalid,
(SetOp::MinusPrimary, true) => Operation::SubtractByUnion,
(SetOp::MinusPrimary, false) => Operation::Subtract,
(SetOp::MinusMultiset, true) => Operation::SubtractByIntersection,
(SetOp::MinusMultiset, false) => Operation::Subtract,
(SetOp::IntersectionPrimary, true) => Operation::IntersectWithUnion,
(SetOp::IntersectionPrimary, false) => Operation::Intersect,
(SetOp::IntersectionMultiset, _) => Operation::Intersect,
(SetOp::UnionDistinct, _) => Operation::Union,
(SetOp::UnionAll, _) => Operation::Merge,
};
match op {
Operation::Invalid => {
describe!(y, Relation, "Invalid set operation");
}
Operation::Subtract => {
describe!(y, Relation, "Set subtraction");
summary!(
y,
"Yields all rows from the first dataset that do not exist \
in the second dataset."
);
}
Operation::SubtractByUnion => {
describe!(y, Relation, "Set subtract by union");
summary!(
y,
"Yields all rows from the first dataset that do not exist \
in any of the other datasets."
);
}
Operation::SubtractByIntersection => {
describe!(y, Relation, "Set subtract by intersection");
summary!(
y,
"Yields all rows from the first dataset that do not exist in \
all of the other datasets."
);
}
Operation::Intersect => {
describe!(y, Relation, "Set intersection");
summary!(
y,
"Yields all rows from the first dataset that exist in all \
datasets."
);
}
Operation::IntersectWithUnion => {
describe!(y, Relation, "Set intersect with union");
summary!(
y,
"Yields all rows from the first dataset that exist in any of \
the other datasets."
);
}
Operation::Union => {
describe!(y, Relation, "Set union");
summary!(
y,
"Yields all rows that exist in any dataset, removing duplicates."
);
}
Operation::Merge => {
describe!(y, Relation, "Merge");
summary!(y, "Yields all rows from all incoming datasets.");
}
};
handle_rel_common!(x, y);
handle_advanced_extension!(x, y);
Ok(())
}