declarative_dataflow/operators/
mod.rs1use timely::dataflow::channels::pact::Pipeline;
5use timely::dataflow::operators::aggregation::StateMachine;
6use timely::dataflow::operators::{generic::operator::Operator, Map};
7use timely::dataflow::Scope;
8
9use differential_dataflow::lattice::Lattice;
10use differential_dataflow::operators::arrange::{Arrange, Arranged};
11use differential_dataflow::trace::{cursor::Cursor, BatchReader};
12use differential_dataflow::{AsCollection, Collection};
13
14use crate::{TraceValHandle, Value};
15
16pub trait CardinalityOne<S: Scope> {
18 fn cardinality_one(&self) -> Collection<S, (Value, Value), isize>;
23}
24
25impl<S> CardinalityOne<S> for Collection<S, (Value, Value), isize>
26where
27 S: Scope,
28 S::Timestamp: Lattice + Ord,
29{
30 fn cardinality_one(&self) -> Collection<S, (Value, Value), isize> {
31 use differential_dataflow::hashable::Hashable;
32
33 let arranged: Arranged<S, TraceValHandle<Value, Value, S::Timestamp, isize>> =
34 self.arrange();
35
36 arranged
37 .stream
38 .unary(Pipeline, "AsCollection", move |_, _| {
39 move |input, output| {
40 input.for_each(|time, data| {
41 let mut session = output.session(&time);
42 for wrapper in data.iter() {
43 let batch = &wrapper;
44 let mut cursor = batch.cursor();
45 while let Some(key) = cursor.get_key(batch) {
46 let mut tuples = Vec::new();
47 while let Some(val) = cursor.get_val(batch) {
48 cursor.map_times(batch, |time, diff| {
49 tuples.push((
50 (key.clone(), val.clone()),
51 time.clone(),
52 diff.clone(),
53 ));
54 });
55 cursor.step_val(batch);
56 }
57
58 tuples.sort_by_key(|(_, ref t, _)| t.clone());
59 session.give_iterator(tuples.drain(..));
60
61 cursor.step_key(batch);
62 }
63 }
64 });
65 }
66 })
67 .map(
68 |((e, next_v), t, diff): ((Value, Value), S::Timestamp, isize)| {
69 (e, (next_v, t, diff))
70 },
71 )
72 .state_machine(
73 |e, (next_v, t, diff), v| {
74 match v {
75 None => {
76 assert!(
77 diff > 0,
78 "Received a retraction of a new key on a CardinalityOne attribute"
79 );
80 *v = Some(next_v.clone());
81 (false, vec![((e.clone(), next_v), t, 1)])
82 }
83 Some(old_v) => {
84 let old_v = old_v.clone();
85 if diff > 0 {
86 *v = Some(next_v.clone());
87 (
88 false,
89 vec![
90 ((e.clone(), old_v), t.clone(), -1),
91 ((e.clone(), next_v), t, 1),
92 ],
93 )
94 } else {
95 (true, vec![((e.clone(), old_v), t, -1)])
97 }
98 }
99 }
100 },
101 |e| e.hashed(),
102 )
103 .as_collection()
104 }
105}