declarative_dataflow/plan/
transform.rs

1//! Function expression plan.
2
3use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::progress::Timestamp;
6
7use differential_dataflow::lattice::Lattice;
8
9use crate::binding::{AsBinding, Binding};
10use crate::plan::{Dependencies, ImplContext, Implementable};
11use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, Value, Var, VariableMap};
12
13/// Permitted functions.
14#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
15pub enum Function {
16    /// Truncates a unix timestamp into an hourly interval
17    TRUNCATE,
18    /// Adds one or more numbers to the first provided
19    ADD,
20    /// Subtracts one or more numbers from the first provided
21    SUBTRACT,
22}
23
24/// A plan stage applying a built-in function to source tuples.
25/// Frontends are responsible for ensuring that the source
26/// binds the argument variables and that the result is projected onto
27/// the right variable.
28#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
29pub struct Transform<P: Implementable> {
30    /// TODO
31    pub variables: Vec<Var>,
32    /// Variable to which the result of the transformation is bound
33    pub result_variable: Var,
34    /// Plan for the data source
35    pub plan: Box<P>,
36    /// Function to apply
37    pub function: Function,
38    /// Constant inputs
39    pub constants: Vec<Option<Value>>,
40}
41
42impl<P: Implementable> Implementable for Transform<P> {
43    fn dependencies(&self) -> Dependencies {
44        self.plan.dependencies()
45    }
46
47    fn into_bindings(&self) -> Vec<Binding> {
48        self.plan.into_bindings()
49    }
50
51    fn implement<'b, T, I, S>(
52        &self,
53        nested: &mut Iterative<'b, S, u64>,
54        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
55        context: &mut I,
56    ) -> (Implemented<'b, S>, ShutdownHandle)
57    where
58        T: Timestamp + Lattice,
59        I: ImplContext<T>,
60        S: Scope<Timestamp = T>,
61    {
62        let (relation, mut shutdown_handle) =
63            self.plan.implement(nested, local_arrangements, context);
64
65        let key_offsets: Vec<usize> = self
66            .variables
67            .iter()
68            .map(|variable| relation.binds(*variable).expect("variable not found"))
69            .collect();
70
71        let mut variables = relation.variables();
72        variables.push(self.result_variable);
73
74        let constants_local = self.constants.clone();
75
76        let tuples = {
77            let (tuples, shutdown) = relation.tuples(nested, context);
78            shutdown_handle.merge_with(shutdown);
79            tuples
80        };
81
82        let transformed = match self.function {
83            Function::TRUNCATE => CollectionRelation {
84                variables,
85                tuples: tuples.map(move |tuple| {
86                    let mut t = match tuple[key_offsets[0]] {
87                        Value::Instant(inst) => inst as u64,
88                        _ => panic!("TRUNCATE can only be applied to timestamps"),
89                    };
90                    let default_interval = String::from(":hour");
91                    let interval_param = match constants_local[1].clone() {
92                        Some(Value::String(interval)) => interval,
93                        None => default_interval,
94                        _ => panic!("Parameter for TRUNCATE must be a string"),
95                    };
96
97                    let mod_val = match interval_param.as_ref() {
98                        ":minute" => 60000,
99                        ":hour" => 3_600_000,
100                        ":day" => 86_400_000,
101                        ":week" => 604_800_000,
102                        _ => panic!("Unknown interval for TRUNCATE"),
103                    };
104
105                    t = t - (t % mod_val);
106                    let mut v = tuple.clone();
107                    v.push(Value::Instant(t));
108                    v
109                }),
110            },
111            Function::ADD => CollectionRelation {
112                variables,
113                tuples: tuples.map(move |tuple| {
114                    let mut result = 0;
115
116                    // summands (vars)
117                    for offset in &key_offsets {
118                        let summand = match tuple[*offset] {
119                            Value::Number(s) => s as i64,
120                            _ => panic!("ADD can only be applied to numbers"),
121                        };
122
123                        result += summand;
124                    }
125
126                    // summands (constants)
127                    for arg in &constants_local {
128                        if let Some(constant) = arg {
129                            let summand = match constant {
130                                Value::Number(s) => *s as i64,
131                                _ => panic!("ADD can only be applied to numbers"),
132                            };
133
134                            result += summand;
135                        }
136                    }
137
138                    let mut v = tuple.clone();
139                    v.push(Value::Number(result));
140                    v
141                }),
142            },
143            Function::SUBTRACT => CollectionRelation {
144                variables,
145                tuples: tuples.map(move |tuple| {
146                    // minuend is either variable or variable, depending on
147                    // position in transform
148
149                    let mut result = match constants_local[0].clone() {
150                        Some(constant) => match constant {
151                            Value::Number(minuend) => minuend as i64,
152                            _ => panic!("SUBTRACT can only be applied to numbers"),
153                        },
154                        None => match tuple[key_offsets[0]] {
155                            Value::Number(minuend) => minuend as i64,
156                            _ => panic!("SUBTRACT can only be applied to numbers"),
157                        },
158                    };
159
160                    // avoid filtering out the minuend by doubling it
161                    result = result + result;
162
163                    // subtrahends (vars)
164                    for offset in &key_offsets {
165                        let subtrahend = match tuple[*offset] {
166                            Value::Number(s) => s as i64,
167                            _ => panic!("SUBTRACT can only be applied to numbers"),
168                        };
169
170                        result -= subtrahend;
171                    }
172
173                    // subtrahends (constants)
174                    for arg in &constants_local {
175                        if let Some(constant) = arg {
176                            let subtrahend = match constant {
177                                Value::Number(s) => *s as i64,
178                                _ => panic!("SUBTRACT can only be applied to numbers"),
179                            };
180
181                            result -= subtrahend;
182                        }
183                    }
184
185                    let mut v = tuple.clone();
186                    v.push(Value::Number(result));
187                    v
188                }),
189            },
190        };
191
192        (Implemented::Collection(transformed), shutdown_handle)
193    }
194}