differential_dataflow/operators/arrange/
upsert.rs

1//! Support for forming collections from streams of upsert.
2//!
3//! Upserts are sequences of keyed optional values, and they define a collection of
4//! the pairs of keys and each's most recent value, if it is present. Element in the
5//! sequence effectively overwrites the previous value at the key, if present, and if
6//! the value is not present it uninstalls the key.
7//!
8//! Upserts are non-trivial because they do not themselves describe the deletions that
9//! the `Collection` update stream must present. However, if one creates an `Arrangement`
10//! then this state provides sufficient information. The arrangement will continue to
11//! exist even if dropped until the input or dataflow shuts down, as the upsert operator
12//! itself needs access to its accumulated state.
13//!
14//! # Notes
15//!
16//! Upserts currently only work with totally ordered timestamps.
17//!
18//! In the case of ties in timestamps (concurrent updates to the same key) they choose
19//! the *greatest* value according to `Option<Val>` ordering, which will prefer a value
20//! to `None` and choose the greatest value (informally, as if applied in order of value).
21//!
22//! If the same value is repeated, no change will occur in the output. That may make this
23//! operator effective at determining the difference between collections of keyed values,
24//! but note that it will not notice the absence of keys in a collection.
25//!
26//! To effect "filtering" in a way that reduces the arrangement footprint, apply a map to
27//! the input stream, mapping values that fail the predicate to `None` values, like so:
28//!
29//! ```ignore
30//! // Dropped values should be retained as "uninstall" upserts.
31//! upserts.map(|(key,opt_val)| (key, opt_val.filter(predicate)))
32//! ```
33//!
34//! # Example
35//!
36//! ```rust
37//! // define a new timely dataflow computation.
38//! timely::execute_from_args(std::env::args().skip(1), move |worker| {
39//!
40//!     type Key = String;
41//!     type Val = String;
42//!
43//!     let mut input = timely::dataflow::InputHandle::new();
44//!     let mut probe = timely::dataflow::ProbeHandle::new();
45//!
46//!     // Create a dataflow demonstrating upserts.
47//!     //
48//!     // Upserts are a sequence of records (key, option<val>) where the intended
49//!     // value associated with a key is the most recent value, and if that is a
50//!     // `none` then the key is removed (until a new value shows up).
51//!     //
52//!     // The challenge with upserts is that the value to *retract* isn't supplied
53//!     // as part of the input stream. We have to determine what it should be!
54//!
55//!     worker.dataflow(|scope| {
56//!
57//!         use timely::dataflow::operators::Input;
58//!         use differential_dataflow::trace::implementations::ValSpine;
59//!         use differential_dataflow::operators::arrange::upsert;
60//!
61//!         let stream = scope.input_from(&mut input);
62//!         let arranged = upsert::arrange_from_upsert::<_, ValSpine<Key, Val, _, _>>(&stream, &"test");
63//!
64//!         arranged
65//!             .as_collection(|k,v| (k.clone(), v.clone()))
66//!             .inspect(|x| println!("Observed: {:?}", x))
67//!             .probe_with(&mut probe);
68//!     });
69//!
70//!     // Introduce the key, with a specific value.
71//!     input.send(("frank".to_string(), Some("mcsherry".to_string()), 3));
72//!     input.advance_to(4);
73//!     while probe.less_than(input.time()) { worker.step(); }
74//!
75//!     // Change the value to a different value.
76//!     input.send(("frank".to_string(), Some("zappa".to_string()), 4));
77//!     input.advance_to(5);
78//!     while probe.less_than(input.time()) { worker.step(); }
79//!
80//!     // Remove the key and its value.
81//!     input.send(("frank".to_string(), None, 5));
82//!     input.advance_to(9);
83//!     while probe.less_than(input.time()) { worker.step(); }
84//!
85//!     // Introduce a new totally different value
86//!     input.send(("frank".to_string(), Some("oz".to_string()), 9));
87//!     input.advance_to(10);
88//!     while probe.less_than(input.time()) { worker.step(); }
89//!
90//!     // Repeat the value, which should produce no output.
91//!     input.send(("frank".to_string(), Some("oz".to_string()), 11));
92//!     input.advance_to(12);
93//!     while probe.less_than(input.time()) { worker.step(); }
94//!     // Remove the key and value.
95//!     input.send(("frank".to_string(), None, 15));
96//!     input.close();
97//!
98//! }).unwrap();
99//! ```
100
101use std::collections::{BinaryHeap, HashMap};
102
103use timely::order::{PartialOrder, TotalOrder};
104use timely::dataflow::{Scope, Stream};
105use timely::dataflow::operators::generic::Operator;
106use timely::dataflow::channels::pact::Exchange;
107use timely::progress::Timestamp;
108use timely::progress::Antichain;
109use timely::dataflow::operators::Capability;
110
111use crate::lattice::Lattice;
112use crate::operators::arrange::arrangement::Arranged;
113use crate::trace::Builder;
114use crate::trace::{self, Trace, TraceReader, Batch, Cursor};
115use crate::{ExchangeData, Hashable};
116
117use super::TraceAgent;
118
119/// Arrange data from a stream of keyed upserts.
120///
121/// The input should be a stream of timestamped pairs of Key and Option<Val>.
122/// The contents of the collection are defined key-by-key, where each optional
123/// value in sequence either replaces or removes the existing value, should it
124/// exist.
125///
126/// This method is only implemented for totally ordered times, as we do not yet
127/// understand what a "sequence" of upserts would mean for partially ordered
128/// timestamps.
129pub fn arrange_from_upsert<G, Tr>(
130    stream: &Stream<G, (Tr::KeyOwned, Option<Tr::ValOwned>, G::Timestamp)>,
131    name: &str,
132) -> Arranged<G, TraceAgent<Tr>>
133where
134    G: Scope,
135    G::Timestamp: Lattice+Ord+TotalOrder+ExchangeData,
136    Tr::KeyOwned: ExchangeData+Hashable+std::hash::Hash,
137    Tr::ValOwned: ExchangeData,
138    Tr: Trace+TraceReader<Time=G::Timestamp,Diff=isize>+'static,
139    Tr::Batch: Batch,
140    Tr::Builder: Builder<Item = ((Tr::KeyOwned, Tr::ValOwned), Tr::Time, Tr::Diff)>,
141{
142    let mut reader: Option<TraceAgent<Tr>> = None;
143
144    // fabricate a data-parallel operator using the `unary_notify` pattern.
145    let stream = {
146
147        let reader = &mut reader;
148
149        let exchange = Exchange::new(move |update: &(Tr::KeyOwned,Option<Tr::ValOwned>,G::Timestamp)| (update.0).hashed().into());
150
151        stream.unary_frontier(exchange, name, move |_capability, info| {
152
153            // Acquire a logger for arrange events.
154            let logger = {
155                let scope = stream.scope();
156                let register = scope.log_register();
157                register.get::<crate::logging::DifferentialEvent>("differential/arrange")
158            };
159
160            // Tracks the lower envelope of times in `priority_queue`.
161            let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
162            let mut buffer = Vec::new();
163            // Form the trace we will both use internally and publish.
164            let activator = Some(stream.scope().activator_for(&info.address[..]));
165            let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
166
167            if let Some(exert_logic) = stream.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
168                empty_trace.set_exert_logic(exert_logic);
169            }
170
171            let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
172            // Capture the reader outside the builder scope.
173            *reader = Some(reader_local.clone());
174
175            // Tracks the input frontier, used to populate the lower bound of new batches.
176            let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
177
178            // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
179            let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, Tr::KeyOwned, Option<Tr::ValOwned>)>>::new();
180            let mut updates = Vec::new();
181
182            move |input, output| {
183
184                // Stash capabilities and associated data (ordered by time).
185                input.for_each(|cap, data| {
186                    capabilities.insert(cap.retain());
187                    data.swap(&mut buffer);
188                    for (key, val, time) in buffer.drain(..) {
189                        priority_queue.push(std::cmp::Reverse((time, key, val)))
190                    }
191                });
192
193                // Assert that the frontier never regresses.
194                assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
195
196                // Test to see if strict progress has occurred, which happens whenever the new
197                // frontier isn't equal to the previous. It is only in this case that we have any
198                // data processing to do.
199                if prev_frontier.borrow() != input.frontier().frontier() {
200
201                    // If there is at least one capability not in advance of the input frontier ...
202                    if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
203
204                        let mut upper = Antichain::new();   // re-used allocation for sealing batches.
205
206                        // For each capability not in advance of the input frontier ...
207                        for (index, capability) in capabilities.elements().iter().enumerate() {
208
209                            if !input.frontier().less_equal(capability.time()) {
210
211                                // Assemble the upper bound on times we can commit with this capabilities.
212                                // We must respect the input frontier, and *subsequent* capabilities, as
213                                // we are pretending to retire the capability changes one by one.
214                                upper.clear();
215                                for time in input.frontier().frontier().iter() {
216                                    upper.insert(time.clone());
217                                }
218                                for other_capability in &capabilities.elements()[(index + 1) .. ] {
219                                    upper.insert(other_capability.time().clone());
220                                }
221
222                                // Extract upserts available to process as of this `upper`.
223                                let mut to_process = HashMap::new();
224                                while priority_queue.peek().map(|std::cmp::Reverse((t,_k,_v))| !upper.less_equal(t)).unwrap_or(false) {
225                                    let std::cmp::Reverse((time, key, val)) = priority_queue.pop().expect("Priority queue just ensured non-empty");
226                                    to_process.entry(key).or_insert(Vec::new()).push((time, std::cmp::Reverse(val)));
227                                }
228                                // Reduce the allocation behind the priority queue if it is presently excessive.
229                                // A factor of four is used to avoid repeated doubling and shrinking.
230                                // TODO: if the queue were a sequence of geometrically sized allocations, we could
231                                // shed the additional capacity without copying any data.
232                                if priority_queue.capacity() > 4 * priority_queue.len() {
233                                    priority_queue.shrink_to_fit();
234                                }
235
236                                // Put (key, list) into key order, to match cursor enumeration.
237                                let mut to_process = to_process.into_iter().collect::<Vec<_>>();
238                                to_process.sort();
239
240                                // Prepare a cursor to the existing arrangement, and a batch builder for
241                                // new stuff that we add.
242                                let (mut trace_cursor, trace_storage) = reader_local.cursor();
243                                let mut builder = Tr::Builder::new();
244                                for (key, mut list) in to_process.drain(..) {
245
246                                    use trace::cursor::MyTrait;
247
248                                    // The prior value associated with the key.
249                                    let mut prev_value: Option<Tr::ValOwned> = None;
250
251                                    // Attempt to find the key in the trace.
252                                    trace_cursor.seek_key_owned(&trace_storage, &key);
253                                    if trace_cursor.get_key(&trace_storage).map(|k| k.equals(&key)).unwrap_or(false) {
254                                        // Determine the prior value associated with the key.
255                                        while let Some(val) = trace_cursor.get_val(&trace_storage) {
256                                            let mut count = 0;
257                                            trace_cursor.map_times(&trace_storage, |_time, diff| count += *diff);
258                                            assert!(count == 0 || count == 1);
259                                            if count == 1 {
260                                                assert!(prev_value.is_none());
261                                                prev_value = Some(val.into_owned());
262                                            }
263                                            trace_cursor.step_val(&trace_storage);
264                                        }
265                                        trace_cursor.step_key(&trace_storage);
266                                    }
267
268                                    // Sort the list of upserts to `key` by their time, suppress multiple updates.
269                                    list.sort();
270                                    list.dedup_by(|(t1,_), (t2,_)| t1 == t2);
271                                    for (time, std::cmp::Reverse(next)) in list {
272                                        if prev_value != next {
273                                            if let Some(prev) = prev_value {
274                                                updates.push(((key.clone(), prev), time.clone(), -1));
275                                            }
276                                            if let Some(next) = next.as_ref() {
277                                                updates.push(((key.clone(), next.clone()), time.clone(), 1));
278                                            }
279                                            prev_value = next;
280                                        }
281                                    }
282                                    // Must insert updates in (key, val, time) order.
283                                    updates.sort();
284                                    for update in updates.drain(..) {
285                                        builder.push(update);
286                                    }
287                                }
288                                let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
289                                prev_frontier.clone_from(&upper);
290
291                                // Communicate `batch` to the arrangement and the stream.
292                                writer.insert(batch.clone(), Some(capability.time().clone()));
293                                output.session(&capabilities.elements()[index]).give(batch);
294                            }
295                        }
296
297                        // Having extracted and sent batches between each capability and the input frontier,
298                        // we should downgrade all capabilities to match the batcher's lower update frontier.
299                        // This may involve discarding capabilities, which is fine as any new updates arrive
300                        // in messages with new capabilities.
301
302                        let mut new_capabilities = Antichain::new();
303                        if let Some(std::cmp::Reverse((time, _, _))) = priority_queue.peek() {
304                            if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
305                                new_capabilities.insert(capability.delayed(time));
306                            }
307                            else {
308                                panic!("failed to find capability");
309                            }
310                        }
311
312                        capabilities = new_capabilities;
313                    }
314                    else {
315                        // Announce progress updates, even without data.
316                        writer.seal(input.frontier().frontier().to_owned());
317                    }
318
319                    // Update our view of the input frontier.
320                    prev_frontier.clear();
321                    prev_frontier.extend(input.frontier().frontier().iter().cloned());
322
323                    // Downgrade capabilities for `reader_local`.
324                    reader_local.set_logical_compaction(prev_frontier.borrow());
325                    reader_local.set_physical_compaction(prev_frontier.borrow());
326                }
327
328                writer.exert();
329            }
330        })
331    };
332
333    Arranged { stream, trace: reader.unwrap() }
334
335}