declarative_dataflow/operators/
mod.rs

1//! Extension traits for `Stream` implementing various
2//! declarative-specific operators.
3
4use timely::dataflow::channels::pact::Pipeline;
5use timely::dataflow::operators::aggregation::StateMachine;
6use timely::dataflow::operators::{generic::operator::Operator, Map};
7use timely::dataflow::Scope;
8
9use differential_dataflow::lattice::Lattice;
10use differential_dataflow::operators::arrange::{Arrange, Arranged};
11use differential_dataflow::trace::{cursor::Cursor, BatchReader};
12use differential_dataflow::{AsCollection, Collection};
13
14use crate::{TraceValHandle, Value};
15
16/// Provides the `cardinality_one` method.
17pub trait CardinalityOne<S: Scope> {
18    /// Ensures that only a single value per eid exists within an
19    /// attribute, by retracting any previous values upon new
20    /// updates. Therefore this stream does not expect explicit
21    /// retractions.
22    fn cardinality_one(&self) -> Collection<S, (Value, Value), isize>;
23}
24
25impl<S> CardinalityOne<S> for Collection<S, (Value, Value), isize>
26where
27    S: Scope,
28    S::Timestamp: Lattice + Ord,
29{
30    fn cardinality_one(&self) -> Collection<S, (Value, Value), isize> {
31        use differential_dataflow::hashable::Hashable;
32
33        let arranged: Arranged<S, TraceValHandle<Value, Value, S::Timestamp, isize>> =
34            self.arrange();
35
36        arranged
37            .stream
38            .unary(Pipeline, "AsCollection", move |_, _| {
39                move |input, output| {
40                    input.for_each(|time, data| {
41                        let mut session = output.session(&time);
42                        for wrapper in data.iter() {
43                            let batch = &wrapper;
44                            let mut cursor = batch.cursor();
45                            while let Some(key) = cursor.get_key(batch) {
46                                let mut tuples = Vec::new();
47                                while let Some(val) = cursor.get_val(batch) {
48                                    cursor.map_times(batch, |time, diff| {
49                                        tuples.push((
50                                            (key.clone(), val.clone()),
51                                            time.clone(),
52                                            diff.clone(),
53                                        ));
54                                    });
55                                    cursor.step_val(batch);
56                                }
57
58                                tuples.sort_by_key(|(_, ref t, _)| t.clone());
59                                session.give_iterator(tuples.drain(..));
60
61                                cursor.step_key(batch);
62                            }
63                        }
64                    });
65                }
66            })
67            .map(
68                |((e, next_v), t, diff): ((Value, Value), S::Timestamp, isize)| {
69                    (e, (next_v, t, diff))
70                },
71            )
72            .state_machine(
73                |e, (next_v, t, diff), v| {
74                    match v {
75                        None => {
76                            assert!(
77                                diff > 0,
78                                "Received a retraction of a new key on a CardinalityOne attribute"
79                            );
80                            *v = Some(next_v.clone());
81                            (false, vec![((e.clone(), next_v), t, 1)])
82                        }
83                        Some(old_v) => {
84                            let old_v = old_v.clone();
85                            if diff > 0 {
86                                *v = Some(next_v.clone());
87                                (
88                                    false,
89                                    vec![
90                                        ((e.clone(), old_v), t.clone(), -1),
91                                        ((e.clone(), next_v), t, 1),
92                                    ],
93                                )
94                            } else {
95                                // Retraction received. Can clean up state.
96                                (true, vec![((e.clone(), old_v), t, -1)])
97                            }
98                        }
99                    }
100                },
101                |e| e.hashed(),
102            )
103            .as_collection()
104    }
105}