declarative_dataflow/plan/
transform.rs1use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::progress::Timestamp;
6
7use differential_dataflow::lattice::Lattice;
8
9use crate::binding::{AsBinding, Binding};
10use crate::plan::{Dependencies, ImplContext, Implementable};
11use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, Value, Var, VariableMap};
12
13#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
15pub enum Function {
16 TRUNCATE,
18 ADD,
20 SUBTRACT,
22}
23
24#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
29pub struct Transform<P: Implementable> {
30 pub variables: Vec<Var>,
32 pub result_variable: Var,
34 pub plan: Box<P>,
36 pub function: Function,
38 pub constants: Vec<Option<Value>>,
40}
41
42impl<P: Implementable> Implementable for Transform<P> {
43 fn dependencies(&self) -> Dependencies {
44 self.plan.dependencies()
45 }
46
47 fn into_bindings(&self) -> Vec<Binding> {
48 self.plan.into_bindings()
49 }
50
51 fn implement<'b, T, I, S>(
52 &self,
53 nested: &mut Iterative<'b, S, u64>,
54 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
55 context: &mut I,
56 ) -> (Implemented<'b, S>, ShutdownHandle)
57 where
58 T: Timestamp + Lattice,
59 I: ImplContext<T>,
60 S: Scope<Timestamp = T>,
61 {
62 let (relation, mut shutdown_handle) =
63 self.plan.implement(nested, local_arrangements, context);
64
65 let key_offsets: Vec<usize> = self
66 .variables
67 .iter()
68 .map(|variable| relation.binds(*variable).expect("variable not found"))
69 .collect();
70
71 let mut variables = relation.variables();
72 variables.push(self.result_variable);
73
74 let constants_local = self.constants.clone();
75
76 let tuples = {
77 let (tuples, shutdown) = relation.tuples(nested, context);
78 shutdown_handle.merge_with(shutdown);
79 tuples
80 };
81
82 let transformed = match self.function {
83 Function::TRUNCATE => CollectionRelation {
84 variables,
85 tuples: tuples.map(move |tuple| {
86 let mut t = match tuple[key_offsets[0]] {
87 Value::Instant(inst) => inst as u64,
88 _ => panic!("TRUNCATE can only be applied to timestamps"),
89 };
90 let default_interval = String::from(":hour");
91 let interval_param = match constants_local[1].clone() {
92 Some(Value::String(interval)) => interval,
93 None => default_interval,
94 _ => panic!("Parameter for TRUNCATE must be a string"),
95 };
96
97 let mod_val = match interval_param.as_ref() {
98 ":minute" => 60000,
99 ":hour" => 3_600_000,
100 ":day" => 86_400_000,
101 ":week" => 604_800_000,
102 _ => panic!("Unknown interval for TRUNCATE"),
103 };
104
105 t = t - (t % mod_val);
106 let mut v = tuple.clone();
107 v.push(Value::Instant(t));
108 v
109 }),
110 },
111 Function::ADD => CollectionRelation {
112 variables,
113 tuples: tuples.map(move |tuple| {
114 let mut result = 0;
115
116 for offset in &key_offsets {
118 let summand = match tuple[*offset] {
119 Value::Number(s) => s as i64,
120 _ => panic!("ADD can only be applied to numbers"),
121 };
122
123 result += summand;
124 }
125
126 for arg in &constants_local {
128 if let Some(constant) = arg {
129 let summand = match constant {
130 Value::Number(s) => *s as i64,
131 _ => panic!("ADD can only be applied to numbers"),
132 };
133
134 result += summand;
135 }
136 }
137
138 let mut v = tuple.clone();
139 v.push(Value::Number(result));
140 v
141 }),
142 },
143 Function::SUBTRACT => CollectionRelation {
144 variables,
145 tuples: tuples.map(move |tuple| {
146 let mut result = match constants_local[0].clone() {
150 Some(constant) => match constant {
151 Value::Number(minuend) => minuend as i64,
152 _ => panic!("SUBTRACT can only be applied to numbers"),
153 },
154 None => match tuple[key_offsets[0]] {
155 Value::Number(minuend) => minuend as i64,
156 _ => panic!("SUBTRACT can only be applied to numbers"),
157 },
158 };
159
160 result = result + result;
162
163 for offset in &key_offsets {
165 let subtrahend = match tuple[*offset] {
166 Value::Number(s) => s as i64,
167 _ => panic!("SUBTRACT can only be applied to numbers"),
168 };
169
170 result -= subtrahend;
171 }
172
173 for arg in &constants_local {
175 if let Some(constant) = arg {
176 let subtrahend = match constant {
177 Value::Number(s) => *s as i64,
178 _ => panic!("SUBTRACT can only be applied to numbers"),
179 };
180
181 result -= subtrahend;
182 }
183 }
184
185 let mut v = tuple.clone();
186 v.push(Value::Number(result));
187 v
188 }),
189 },
190 };
191
192 (Implemented::Collection(transformed), shutdown_handle)
193 }
194}