Skip to main content

palimpsest_dataflow/
capture.rs

1//! Logic related to capture and replay of differential collections.
2//!
3//! This module defines a protocol for capturing and replaying differential collections
4//! to streaming storage that may both duplicate and reorder messages. It records facts
5//! about the collection that once true stay true, such as the exact changes data undergo
6//! at each time, and the number of distinct updates at each time.
7//!
8//! The methods are parameterized by implementors of byte sources and byte sinks. For
9//! example implementations of these traits, consult the commented text at the end of
10//! this file.
11
12use serde::{Deserialize, Serialize};
13use std::time::Duration;
14
15/// A message in the CDC V2 protocol.
16#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
17pub enum Message<D, T, R> {
18    /// A batch of updates that are certain to occur.
19    ///
20    /// Each triple is an irrevocable statement about a change that occurs.
21    /// Each statement contains a datum, a time, and a difference, and asserts
22    /// that the multiplicity of the datum changes at the time by the difference.
23    Updates(Vec<(D, T, R)>),
24    /// An irrevocable statement about the number of updates within a time interval.
25    Progress(Progress<T>),
26}
27
28/// An irrevocable statement about the number of updates at times within an interval.
29///
30/// This statement covers all times beyond `lower` and not beyond `upper`.
31/// Each element of `counts` is an irrevocable statement about the exact number of
32/// distinct updates that occur at that time.
33/// Times not present in `counts` have a count of zero.
34#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
35pub struct Progress<T> {
36    /// The lower bound of times contained in this statement.
37    pub lower: Vec<T>,
38    /// The upper bound of times contained in this statement.
39    pub upper: Vec<T>,
40    /// All non-zero counts for times beyond `lower` and not beyond `upper`.
41    pub counts: Vec<(T, usize)>,
42}
43
44/// A simple sink for byte slices.
45pub trait Writer<T> {
46    /// Returns an amount of time to wait before retrying, or `None` for success.
47    fn poll(&mut self, item: &T) -> Option<Duration>;
48    /// Indicates if the sink has committed all sent data and can be safely dropped.
49    fn done(&self) -> bool;
50}
51
52/// A deduplicating, re-ordering iterator.
53pub mod iterator {
54
55    use super::{Message, Progress};
56    use crate::lattice::Lattice;
57    use std::hash::Hash;
58    use timely::order::PartialOrder;
59    use timely::progress::{
60        frontier::{AntichainRef, MutableAntichain},
61        Antichain, Timestamp,
62    };
63
64    /// A direct implementation of a deduplicating, re-ordering iterator.
65    ///
66    /// The iterator draws from a source that may have arbitrary duplication, be arbitrarily out of order,
67    /// and yet produces each update once, with in-order batches. The iterator maintains a bounded memory
68    /// footprint, proportional to the mismatch between the received updates and progress messages.
69    pub struct Iter<I, D, T, R>
70    where
71        I: Iterator<Item = Message<D, T, R>>,
72        T: Hash + Ord + Lattice + Clone,
73        D: Hash + Eq,
74        T: Hash + Eq,
75        R: Hash + Eq,
76    {
77        /// Source of potentially duplicated, out of order cdc_v2 messages.
78        iterator: I,
79        /// Updates that have been received, but are still beyond `reported_frontier`.
80        ///
81        /// These updates are retained both so that they can eventually be transmitted,
82        /// but also so that they can deduplicate updates that may still be received.
83        updates: std::collections::HashSet<(D, T, R)>,
84        /// Frontier through which the iterator has reported updates.
85        ///
86        /// All updates not beyond this frontier have been reported.
87        /// Any information related to times not beyond this frontier can be discarded.
88        ///
89        /// This frontier tracks the meet of `progress_frontier` and `messages_frontier`,
90        /// our two bounds on potential uncertainty in progress and update messages.
91        reported_frontier: Antichain<T>,
92        /// Frontier of accepted progress statements.
93        ///
94        /// All progress message counts for times not beyond this frontier have been
95        /// incorporated in to `messages_frontier`. This frontier also guides which
96        /// received progress statements can be incorporated: those whose for which
97        /// this frontier is beyond their lower bound.
98        progress_frontier: Antichain<T>,
99        /// Counts of outstanding messages at times.
100        ///
101        /// These counts track the difference between message counts at times announced
102        /// by progress messages, and message counts at times received in distinct updates.
103        messages_frontier: MutableAntichain<T>,
104        /// Progress statements that are not yet actionable due to out-of-Iterness.
105        ///
106        /// A progress statement becomes actionable once the progress frontier is beyond
107        /// its lower frontier. This ensures that the [0, lower) interval is already
108        /// incorporated, and that we will not leave a gap by incorporating the counts
109        /// and reflecting the progress statement's upper frontier.
110        progress_queue: Vec<Progress<T>>,
111    }
112
113    impl<D, T, R, I> Iterator for Iter<I, D, T, R>
114    where
115        I: Iterator<Item = Message<D, T, R>>,
116        T: Hash + Ord + Lattice + Clone,
117        D: Hash + Eq + Clone,
118        R: Hash + Eq + Clone,
119    {
120        type Item = (Vec<(D, T, R)>, Antichain<T>);
121        fn next(&mut self) -> Option<Self::Item> {
122            // Each call to `next` should return some newly carved interval of time.
123            // As such, we should read from our source until we find such a thing.
124            //
125            // An interval can be completed once our frontier of received progress
126            // information and our frontier of unresolved counts have advanced.
127            while let Some(message) = self.iterator.next() {
128                match message {
129                    Message::Updates(mut updates) => {
130                        // Discard updates at reported times, or duplicates at unreported times.
131                        updates.retain(|dtr| {
132                            self.reported_frontier.less_equal(&dtr.1) && !self.updates.contains(dtr)
133                        });
134                        // Decrement our counts of accounted-for messages.
135                        self.messages_frontier
136                            .update_iter(updates.iter().map(|(_, t, _)| (t.clone(), -1)));
137                        // Record the messages in our de-duplication collection.
138                        self.updates.extend(updates.into_iter());
139                    }
140                    Message::Progress(progress) => {
141                        // A progress statement may not be immediately actionable.
142                        self.progress_queue.push(progress);
143                    }
144                }
145
146                // Attempt to drain actionable progress messages.
147                // A progress message is actionable if `self.progress_frontier` is greater or
148                // equal to the message's lower bound.
149                while let Some(position) = self.progress_queue.iter().position(|p| {
150                    <_ as PartialOrder>::less_equal(
151                        &AntichainRef::new(&p.lower),
152                        &self.progress_frontier.borrow(),
153                    )
154                }) {
155                    let mut progress = self.progress_queue.remove(position);
156                    // Discard counts that have already been incorporated.
157                    progress
158                        .counts
159                        .retain(|(time, _count)| self.progress_frontier.less_equal(time));
160                    // Record any new reports of expected counts.
161                    self.messages_frontier
162                        .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
163                    // Extend the frontier to be times greater or equal to both progress.upper and self.progress_frontier.
164                    let mut new_frontier = Antichain::new();
165                    for time1 in progress.upper {
166                        for time2 in self.progress_frontier.elements() {
167                            new_frontier.insert(time1.join(time2));
168                        }
169                    }
170                    self.progress_queue.retain(|p| {
171                        !<_ as PartialOrder>::less_equal(
172                            &AntichainRef::new(&p.upper),
173                            &new_frontier.borrow(),
174                        )
175                    });
176                    self.progress_frontier = new_frontier;
177                }
178
179                // Now check and see if our lower bound exceeds `self.reported_frontier`.
180                let mut lower_bound = self.progress_frontier.clone();
181                lower_bound.extend(self.messages_frontier.frontier().iter().cloned());
182                if lower_bound != self.reported_frontier {
183                    let to_publish = self
184                        .updates
185                        .iter()
186                        .filter(|(_, t, _)| !lower_bound.less_equal(t))
187                        .cloned()
188                        .collect::<Vec<_>>();
189                    self.updates.retain(|(_, t, _)| lower_bound.less_equal(t));
190                    self.reported_frontier = lower_bound.clone();
191                    return Some((to_publish, lower_bound));
192                }
193            }
194            None
195        }
196    }
197
198    impl<D, T, R, I> Iter<I, D, T, R>
199    where
200        I: Iterator<Item = Message<D, T, R>>,
201        T: Hash + Ord + Lattice + Clone + Timestamp,
202        D: Hash + Eq + Clone,
203        R: Hash + Eq + Clone,
204    {
205        /// Construct a new re-ordering, deduplicating iterator.
206        pub fn new(iterator: I) -> Self {
207            Self {
208                iterator,
209                updates: std::collections::HashSet::new(),
210                reported_frontier: Antichain::from_elem(T::minimum()),
211                progress_frontier: Antichain::from_elem(T::minimum()),
212                messages_frontier: MutableAntichain::new(),
213                progress_queue: Vec::new(),
214            }
215        }
216    }
217}
218
219/// Methods for recovering update streams from binary bundles.
220pub mod source {
221
222    use super::{Message, Progress};
223    use crate::{lattice::Lattice, ExchangeData};
224    use std::cell::RefCell;
225    use std::hash::Hash;
226    use std::marker::{Send, Sync};
227    use std::rc::Rc;
228    use std::sync::Arc;
229    use timely::dataflow::operators::generic::OutputBuilder;
230    use timely::dataflow::{
231        operators::{Capability, CapabilitySet},
232        Scope, Stream,
233    };
234    use timely::progress::Timestamp;
235    use timely::scheduling::SyncActivator;
236
237    // TODO(guswynn): implement this generally in timely
238    struct DropActivator {
239        activator: Arc<SyncActivator>,
240    }
241
242    impl Drop for DropActivator {
243        fn drop(&mut self) {
244            // Best effort: failure to activate
245            // is ignored
246            let _ = self.activator.activate();
247        }
248    }
249
250    /// Constructs a stream of updates from a source of messages.
251    ///
252    /// The stream is built in the supplied `scope` and continues to run until
253    /// the returned `Box<Any>` token is dropped. The `source_builder` argument
254    /// is invoked with a `SyncActivator` that will re-activate the source.
255    pub fn build<G, B, I, D, T, R>(
256        scope: G,
257        source_builder: B,
258    ) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
259    where
260        G: Scope<Timestamp = T>,
261        B: FnOnce(SyncActivator) -> I,
262        I: Iterator<Item = Message<D, T, R>> + 'static,
263        D: ExchangeData + Hash,
264        T: ExchangeData + Hash + Timestamp + Lattice,
265        R: ExchangeData + Hash,
266    {
267        // Read messages are either updates or progress messages.
268        // Each may contain duplicates, and we must take care to deduplicate information before introducing it to an accumulation.
269        // This includes both emitting updates, and setting expectations for update counts.
270        //
271        // Updates need to be deduplicated by (data, time), and we should exchange them by such.
272        // Progress needs to be deduplicated by time, and we should exchange them by such.
273        //
274        // The first cut of this is a dataflow graph that looks like (flowing downward)
275        //
276        // 1. MESSAGES:
277        //      Reads `Message` stream; maintains capabilities.
278        //      Sends `Updates` to UPDATES stage by hash((data, time, diff)).
279        //      Sends `Progress` to PROGRESS stage by hash(time), each with lower, upper bounds.
280        //      Shares capabilities with downstream operator.
281        // 2. UPDATES:
282        //      Maintains and deduplicates updates.
283        //      Ships updates once frontier advances.
284        //      Ships counts to PROGRESS stage, by hash(time).
285        // 3. PROGRESS:
286        //      Maintains outstanding message counts by time. Tracks frontiers.
287        //      Tracks lower bounds of messages and progress frontier. Broadcasts changes to FEEDBACK stage
288        // 4. FEEDBACK:
289        //      Shares capabilities with MESSAGES; downgrades to track input from PROGRESS.
290        //
291        // Each of these stages can be arbitrarily data-parallel, and FEEDBACK *must* have the same parallelism as RAW.
292        // Limitations: MESSAGES must broadcast lower and upper bounds to PROGRESS and PROGRESS must broadcast its changes
293        // to FEEDBACK. This may mean that scaling up PROGRESS could introduce quadratic problems. Though, both of these
294        // broadcast things are meant to be very reduced data.
295
296        use crate::hashable::Hashable;
297        use timely::dataflow::channels::pact::Exchange;
298        use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
299        use timely::progress::frontier::MutableAntichain;
300        use timely::progress::ChangeBatch;
301
302        // Some message distribution logic depends on the number of workers.
303        let workers = scope.peers();
304
305        let mut token = None;
306        // Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
307        let mut antichain = MutableAntichain::new();
308        antichain.update_iter(Some((T::minimum(), workers as i64)));
309        let shared_frontier = Rc::new(RefCell::new(antichain));
310        let shared_frontier2 = shared_frontier.clone();
311
312        // Step 1: The MESSAGES operator.
313        let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
314        let address = messages_op.operator_info().address;
315        let activator = scope.sync_activator_for(address.to_vec());
316        let activator2 = scope.activator_for(Rc::clone(&address));
317        let drop_activator = DropActivator {
318            activator: Arc::new(scope.sync_activator_for(address.to_vec())),
319        };
320        let mut source = source_builder(activator);
321        let (updates_out, updates) = messages_op.new_output();
322        let mut updates_out = OutputBuilder::from(updates_out);
323        let (progress_out, progress) = messages_op.new_output();
324        let mut progress_out = OutputBuilder::from(progress_out);
325
326        messages_op.build(|capabilities| {
327            // A Weak that communicates whether the returned token has been dropped.
328            let drop_activator_weak = Arc::downgrade(&drop_activator.activator);
329
330            token = Some(drop_activator);
331
332            // Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
333            let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
334            let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
335            // Capture the shared frontier to read out frontier updates to apply.
336            let local_frontier = shared_frontier.clone();
337            //
338            move |_frontiers| {
339                // First check to ensure that we haven't been terminated by someone dropping our tokens.
340                if drop_activator_weak.upgrade().is_none() {
341                    // Give up our capabilities
342                    updates_caps.downgrade(&[]);
343                    progress_caps.downgrade(&[]);
344                    // never continue, even if we are (erroneously) activated again.
345                    return;
346                }
347
348                // Consult our shared frontier, and ensure capabilities are downgraded to it.
349                let shared_frontier = local_frontier.borrow();
350                updates_caps.downgrade(&shared_frontier.frontier());
351                progress_caps.downgrade(&shared_frontier.frontier());
352
353                // Next check to see if we have been terminated by the source being complete.
354                if !updates_caps.is_empty() && !progress_caps.is_empty() {
355                    let mut updates = updates_out.activate();
356                    let mut progress = progress_out.activate();
357
358                    // TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
359                    // Specifically, there may not be one capability valid for all updates.
360                    let mut updates_session = updates.session(&updates_caps[0]);
361                    let mut progress_session = progress.session(&progress_caps[0]);
362
363                    // We presume the iterator will yield if appropriate.
364                    for message in source.by_ref() {
365                        match message {
366                            Message::Updates(mut updates) => {
367                                updates_session.give_container(&mut updates);
368                            }
369                            Message::Progress(progress) => {
370                                // We must send a copy of each progress message to all workers,
371                                // but we can partition the counts across workers by timestamp.
372                                let mut to_worker = vec![Vec::new(); workers];
373                                for (time, count) in progress.counts {
374                                    to_worker[(time.hashed() as usize) % workers]
375                                        .push((time, count));
376                                }
377                                for (worker, counts) in to_worker.into_iter().enumerate() {
378                                    progress_session.give((
379                                        worker,
380                                        Progress {
381                                            lower: progress.lower.clone(),
382                                            upper: progress.upper.clone(),
383                                            counts,
384                                        },
385                                    ));
386                                }
387                            }
388                        }
389                    }
390                }
391            }
392        });
393
394        // Step 2: The UPDATES operator.
395        let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone());
396        let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
397        let (changes_out, changes) = updates_op.new_output();
398        let mut changes_out = OutputBuilder::from(changes_out);
399        let (counts_out, counts) = updates_op.new_output();
400        let mut counts_out = OutputBuilder::from(counts_out);
401
402        updates_op.build(move |_capability| {
403            // Deduplicates updates, and ships novel updates and the counts for each time.
404            // For simplicity, this operator ships updates as they are discovered to be new.
405            // This has the defect that on load we may have two copies of the data (shipped,
406            // and here for deduplication).
407            //
408            // Filters may be pushed ahead of this operator, but because of deduplication we
409            // may not push projections ahead of this operator (at least, not without fields
410            // that are known to form keys, and even then only carefully).
411            let mut pending = std::collections::HashMap::new();
412            let mut change_batch = ChangeBatch::<T>::new();
413            move |frontiers| {
414                // Thin out deduplication buffer.
415                // This is the moment in a more advanced implementation where we might send
416                // the data for the first time, maintaining only one copy of each update live
417                // at a time in the system.
418                pending.retain(|(_row, time), _diff| frontiers[0].less_equal(time));
419
420                // Deduplicate newly received updates, sending new updates and timestamp counts.
421                let mut changes = changes_out.activate();
422                let mut counts = counts_out.activate();
423                while let Some((capability, updates)) = input.next() {
424                    let mut changes_session = changes.session(&capability);
425                    let mut counts_session = counts.session(&capability);
426                    for (data, time, diff) in updates.iter() {
427                        if frontiers[0].less_equal(time) {
428                            if let Some(prior) =
429                                pending.insert((data.clone(), time.clone()), diff.clone())
430                            {
431                                assert_eq!(&prior, diff);
432                            } else {
433                                change_batch.update(time.clone(), -1);
434                                changes_session.give((data.clone(), time.clone(), diff.clone()));
435                            }
436                        }
437                    }
438                    if !change_batch.is_empty() {
439                        counts_session.give_iterator(change_batch.drain());
440                    }
441                }
442            }
443        });
444
445        // Step 3: The PROGRESS operator.
446        let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope.clone());
447        let mut input = progress_op.new_input(
448            &progress,
449            Exchange::new(|x: &(usize, Progress<T>)| x.0 as u64),
450        );
451        let mut counts =
452            progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
453        let (frontier_out, frontier) = progress_op.new_output();
454        let mut frontier_out = OutputBuilder::from(frontier_out);
455        progress_op.build(move |_capability| {
456            // Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.
457
458            use timely::order::PartialOrder;
459            use timely::progress::{frontier::AntichainRef, Antichain};
460
461            let mut progress_queue = Vec::new();
462            let mut progress_frontier = Antichain::from_elem(T::minimum());
463            let mut updates_frontier = MutableAntichain::new();
464            let mut reported_frontier = Antichain::from_elem(T::minimum());
465
466            move |_frontiers| {
467                let mut frontier = frontier_out.activate();
468
469                // If the frontier changes we need a capability to express that.
470                // Any capability should work; the downstream listener doesn't care.
471                let mut capability: Option<Capability<T>> = None;
472
473                // Drain all relevant update counts in to the mutable antichain tracking its frontier.
474                while let Some((cap, counts)) = counts.next() {
475                    updates_frontier.update_iter(counts.iter().cloned());
476                    capability = Some(cap.retain());
477                }
478                // Drain all progress statements into the queue out of which we will work.
479                while let Some((cap, progress)) = input.next() {
480                    progress_queue.extend(progress.iter().map(|x| (x.1).clone()));
481                    capability = Some(cap.retain());
482                }
483
484                // Extract and act on actionable progress messages.
485                // A progress message is actionable if `self.progress_frontier` is beyond the message's lower bound.
486                while let Some(position) = progress_queue.iter().position(|p| {
487                    <_ as PartialOrder>::less_equal(
488                        &AntichainRef::new(&p.lower),
489                        &progress_frontier.borrow(),
490                    )
491                }) {
492                    // Extract progress statement.
493                    let mut progress = progress_queue.remove(position);
494                    // Discard counts that have already been incorporated.
495                    progress
496                        .counts
497                        .retain(|(time, _count)| progress_frontier.less_equal(time));
498                    // Record any new reports of expected counts.
499                    updates_frontier
500                        .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
501                    // Extend self.progress_frontier by progress.upper.
502                    let mut new_frontier = Antichain::new();
503                    for time1 in progress.upper {
504                        for time2 in progress_frontier.elements() {
505                            new_frontier.insert(time1.join(time2));
506                        }
507                    }
508                    progress_frontier = new_frontier;
509                }
510
511                // Determine if the lower bound of frontiers have advanced, and transmit updates if so.
512                let mut lower_bound = progress_frontier.clone();
513                lower_bound.extend(updates_frontier.frontier().iter().cloned());
514                if lower_bound != reported_frontier {
515                    let capability =
516                        capability.expect("Changes occurred, without surfacing a capability");
517                    let mut changes = ChangeBatch::new();
518                    changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1)));
519                    changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1)));
520                    let mut frontier_session = frontier.session(&capability);
521                    for peer in 0..workers {
522                        frontier_session.give((peer, changes.clone()));
523                    }
524                    reported_frontier = lower_bound.clone();
525                }
526            }
527        });
528
529        // Step 4: The FEEDBACK operator.
530        let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope.clone());
531        let mut input = feedback_op.new_input(
532            &frontier,
533            Exchange::new(|x: &(usize, ChangeBatch<T>)| x.0 as u64),
534        );
535        feedback_op.build(move |_capability| {
536            // Receive frontier changes and share the net result with MESSAGES.
537            move |_frontiers| {
538                let mut antichain = shared_frontier2.borrow_mut();
539                let mut must_activate = false;
540                while let Some((_cap, frontier_changes)) = input.next() {
541                    for (_self, input_changes) in frontier_changes.iter() {
542                        // Apply the updates, and observe if the lower bound has changed.
543                        if antichain
544                            .update_iter(input_changes.unstable_internal_updates().iter().cloned())
545                            .next()
546                            .is_some()
547                        {
548                            must_activate = true;
549                        }
550                    }
551                }
552                // If the lower bound has changed, we must activate MESSAGES.
553                if must_activate {
554                    activator2.activate();
555                }
556            }
557        });
558
559        (Box::new(token.unwrap()), changes)
560    }
561}
562
563/// Methods for recording update streams to binary bundles.
564pub mod sink {
565
566    use std::cell::RefCell;
567    use std::hash::Hash;
568    use std::rc::Weak;
569
570    use serde::{Deserialize, Serialize};
571
572    use timely::dataflow::channels::pact::{Exchange, Pipeline};
573    use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder};
574    use timely::dataflow::{Scope, Stream};
575    use timely::order::PartialOrder;
576    use timely::progress::{Antichain, ChangeBatch, Timestamp};
577
578    use super::{Message, Progress, Writer};
579    use crate::{lattice::Lattice, ExchangeData};
580
581    /// Constructs a sink, for recording the updates in `stream`.
582    ///
583    /// It is crucial that `stream` has been consolidated before this method, which
584    /// will *not* perform the consolidation on the stream's behalf. If this is not
585    /// performed before calling the method, the recorded output may not be correctly
586    /// reconstructed by readers.
587    pub fn build<G, BS, D, T, R>(
588        stream: &Stream<G, (D, T, R)>,
589        sink_hash: u64,
590        updates_sink: Weak<RefCell<BS>>,
591        progress_sink: Weak<RefCell<BS>>,
592    ) where
593        G: Scope<Timestamp = T>,
594        BS: Writer<Message<D, T, R>> + 'static,
595        D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
596        T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
597        R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
598    {
599        // First we record the updates that stream in.
600        // We can simply record all updates, under the presumption that the have been consolidated
601        // and so any record we see is in fact guaranteed to happen.
602        let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
603        let reactivator = stream
604            .scope()
605            .activator_for(builder.operator_info().address);
606        let mut input = builder.new_input(stream, Pipeline);
607        let (updates_out, updates) = builder.new_output();
608        let mut updates_out = OutputBuilder::from(updates_out);
609
610        builder.build_reschedule(move |_capability| {
611            let mut timestamps = <ChangeBatch<_>>::new();
612            let mut send_queue = std::collections::VecDeque::new();
613            move |_frontiers| {
614                let mut output = updates_out.activate();
615
616                // We want to drain inputs always...
617                input.for_each(|capability, updates| {
618                    // Write each update out, and record the timestamp.
619                    for (_data, time, _diff) in updates.iter() {
620                        timestamps.update(time.clone(), 1);
621                    }
622
623                    // Now record the update to the writer.
624                    send_queue.push_back(Message::Updates(std::mem::take(updates)));
625
626                    // Transmit timestamp counts downstream.
627                    output
628                        .session(&capability)
629                        .give_iterator(timestamps.drain());
630                });
631
632                // Drain whatever we can from the queue of bytes to send.
633                // ... but needn't do anything more if our sink is closed.
634                if let Some(sink) = updates_sink.upgrade() {
635                    let mut sink = sink.borrow_mut();
636                    while let Some(message) = send_queue.front() {
637                        if let Some(duration) = sink.poll(message) {
638                            // Reschedule after `duration` and then bail.
639                            reactivator.activate_after(duration);
640                            return true;
641                        } else {
642                            send_queue.pop_front();
643                        }
644                    }
645                    // Signal incompleteness if messages remain to be sent.
646                    !sink.done() || !send_queue.is_empty()
647                } else {
648                    // We have been terminated, but may still receive indefinite data.
649                    send_queue.clear();
650                    // Signal that there are no outstanding writes.
651                    false
652                }
653            }
654        });
655
656        // We use a lower-level builder here to get access to the operator address, for rescheduling.
657        let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
658        let reactivator = stream
659            .scope()
660            .activator_for(builder.operator_info().address);
661        let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
662        let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();
663
664        // We now record the numbers of updates at each timestamp between lower and upper bounds.
665        // Track the advancing frontier, to know when to produce utterances.
666        let mut frontier = Antichain::from_elem(T::minimum());
667        // Track accumulated counts for timestamps.
668        let mut timestamps = <ChangeBatch<_>>::new();
669        // Stash for serialized data yet to send.
670        let mut send_queue = std::collections::VecDeque::new();
671        let mut retain = Vec::new();
672
673        builder.build_reschedule(|_capabilities| {
674            move |frontiers| {
675                // We want to drain inputs no matter what.
676                // We could do this after the next step, as we are certain these timestamps will
677                // not be part of a closed frontier (as they have not yet been read). This has the
678                // potential to make things speedier as we scan less and keep a smaller footprint.
679                input.for_each(|_capability, counts| {
680                    timestamps.extend(counts.iter().cloned());
681                });
682
683                if should_write {
684                    if let Some(sink) = progress_sink.upgrade() {
685                        let mut sink = sink.borrow_mut();
686
687                        // If our frontier advances strictly, we have the opportunity to issue a progress statement.
688                        if <_ as PartialOrder>::less_than(
689                            &frontier.borrow(),
690                            &frontiers[0].frontier(),
691                        ) {
692                            let new_frontier = frontiers[0].frontier();
693
694                            // Extract the timestamp counts to announce.
695                            let mut announce = Vec::new();
696                            for (time, count) in timestamps.drain() {
697                                if !new_frontier.less_equal(&time) {
698                                    announce.push((time, count as usize));
699                                } else {
700                                    retain.push((time, count));
701                                }
702                            }
703                            timestamps.extend(retain.drain(..));
704
705                            // Announce the lower bound, upper bound, and timestamp counts.
706                            let progress = Progress {
707                                lower: frontier.elements().to_vec(),
708                                upper: new_frontier.to_vec(),
709                                counts: announce,
710                            };
711                            send_queue.push_back(Message::Progress(progress));
712
713                            // Advance our frontier to track our progress utterance.
714                            frontier = frontiers[0].frontier().to_owned();
715
716                            while let Some(message) = send_queue.front() {
717                                if let Some(duration) = sink.poll(message) {
718                                    // Reschedule after `duration` and then bail.
719                                    reactivator.activate_after(duration);
720                                    // Signal that work remains to be done.
721                                    return true;
722                                } else {
723                                    send_queue.pop_front();
724                                }
725                            }
726                        }
727                        // Signal incompleteness if messages remain to be sent.
728                        !sink.done() || !send_queue.is_empty()
729                    } else {
730                        timestamps.clear();
731                        send_queue.clear();
732                        // Signal that there are no outstanding writes.
733                        false
734                    }
735                } else {
736                    false
737                }
738            }
739        });
740    }
741}
742
743// pub mod kafka {
744
745//     use serde::{Serialize, Deserialize};
746//     use timely::scheduling::SyncActivator;
747//     use rdkafka::{ClientContext, config::ClientConfig};
748//     use rdkafka::consumer::{BaseConsumer, ConsumerContext};
749//     use rdkafka::error::{KafkaError, RDKafkaError};
750//     use super::BytesSink;
751
752//     use std::hash::Hash;
753//     use timely::progress::Timestamp;
754//     use timely::dataflow::{Scope, Stream};
755//     use crate::ExchangeData;
756//     use crate::lattice::Lattice;
757
758//     /// Creates a Kafka source from supplied configuration information.
759//     pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
760//     where
761//         G: Scope<Timestamp = T>,
762//         D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
763//         T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice,
764//         R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
765//     {
766//         super::source::build(scope, |activator| {
767//             let source = KafkaSource::new(addr, topic, group, activator);
768//             // An iterator combinator that yields every "duration" even if more items exist.
769//             // The implementation of such an iterator exists in the git history, or can be rewritten easily.
770//             super::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
771//         })
772//     }
773
774//     pub fn create_sink<G, D, T, R>(stream: &Stream<G, (D, T, R)>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
775//     where
776//         G: Scope<Timestamp = T>,
777//         D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
778//         T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
779//         R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
780//     {
781//         use std::rc::Rc;
782//         use std::cell::RefCell;
783//         use crate::hashable::Hashable;
784
785//         let sink = KafkaSink::new(addr, topic);
786//         let result = Rc::new(RefCell::new(sink));
787//         let sink_hash = (addr.to_string(), topic.to_string()).hashed();
788//         super::sink::build(
789//             &stream,
790//             sink_hash,
791//             Rc::downgrade(&result),
792//             Rc::downgrade(&result),
793//         );
794//         Box::new(result)
795
796//     }
797
798//     pub struct KafkaSource {
799//         consumer: BaseConsumer<ActivationConsumerContext>,
800//     }
801
802//     impl KafkaSource {
803//         pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self {
804//             let mut kafka_config = ClientConfig::new();
805//             // This is mostly cargo-cult'd in from `source/kafka.rs`.
806//             kafka_config.set("bootstrap.servers", &addr.to_string());
807//             kafka_config
808//                 .set("enable.auto.commit", "false")
809//                 .set("auto.offset.reset", "earliest");
810
811//             kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); // 30 seconds
812//             kafka_config.set("fetch.message.max.bytes", "134217728");
813//             kafka_config.set("group.id", group);
814//             kafka_config.set("isolation.level", "read_committed");
815//             let activator = ActivationConsumerContext(activator);
816//             let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap();
817//             use rdkafka::consumer::Consumer;
818//             consumer.subscribe(&[topic]).unwrap();
819//             Self {
820//                 consumer,
821//             }
822//         }
823//     }
824
825//     pub struct Iter<D, T, R> {
826//         pub source: KafkaSource,
827//         phantom: std::marker::PhantomData<(D, T, R)>,
828//     }
829
830//     impl<D, T, R> Iter<D, T, R> {
831//         /// Constructs a new iterator from a bytes source.
832//         pub fn new_from(source: KafkaSource) -> Self {
833//             Self {
834//                 source,
835//                 phantom: std::marker::PhantomData,
836//             }
837//         }
838//     }
839
840//     impl<D, T, R> Iterator for Iter<D, T, R>
841//     where
842//         D: for<'a>Deserialize<'a>,
843//         T: for<'a>Deserialize<'a>,
844//         R: for<'a>Deserialize<'a>,
845//     {
846//         type Item = super::Message<D, T, R>;
847//         fn next(&mut self) -> Option<Self::Item> {
848//             use rdkafka::message::Message;
849//             self.source
850//                 .consumer
851//                 .poll(std::time::Duration::from_millis(0))
852//                 .and_then(|result| result.ok())
853//                 .and_then(|message| {
854//                     message.payload().and_then(|message| bincode::deserialize::<super::Message<D, T, R>>(message).ok())
855//                 })
856//         }
857//     }
858
859//     /// An implementation of [`ConsumerContext`] that unparks the wrapped thread
860//     /// when the message queue switches from nonempty to empty.
861//     struct ActivationConsumerContext(SyncActivator);
862
863//     impl ClientContext for ActivationConsumerContext { }
864
865//     impl ActivationConsumerContext {
866//         fn activate(&self) {
867//             self.0.activate().unwrap();
868//         }
869//     }
870
871//     impl ConsumerContext for ActivationConsumerContext {
872//         fn message_queue_nonempty_callback(&self) {
873//             self.activate();
874//         }
875//     }
876
877//     use std::time::Duration;
878//     use rdkafka::producer::DefaultProducerContext;
879//     use rdkafka::producer::{BaseRecord, ThreadedProducer};
880
881//     pub struct KafkaSink {
882//         topic: String,
883//         producer: ThreadedProducer<DefaultProducerContext>,
884//     }
885
886//     impl KafkaSink {
887//         pub fn new(addr: &str, topic: &str) -> Self {
888//             let mut config = ClientConfig::new();
889//             config.set("bootstrap.servers", &addr);
890//             config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20));
891//             config.set("queue.buffering.max.messages", &format!("{}", 10_000_000));
892//             config.set("queue.buffering.max.ms", &format!("{}", 10));
893//             let producer = config
894//                 .create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext)
895//                 .expect("creating kafka producer for kafka sinks failed");
896//             Self {
897//                 producer,
898//                 topic: topic.to_string(),
899//             }
900//         }
901//     }
902
903//     impl BytesSink for KafkaSink {
904//         fn poll(&mut self, bytes: &[u8]) -> Option<Duration> {
905//             let record = BaseRecord::<[u8], _>::to(&self.topic).payload(bytes);
906
907//             self.producer.send(record).err().map(|(e, _)| {
908//                 if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e {
909//                     Duration::from_secs(1)
910//                 } else {
911//                     // TODO(frank): report this error upwards so the user knows the sink is dead.
912//                     Duration::from_secs(1)
913//                 }
914//             })
915//         }
916//         fn done(&self) -> bool {
917//             self.producer.in_flight_count() == 0
918//         }
919//     }
920
921// }