differential_dataflow/
capture.rs

1//! Logic related to capture and replay of differential collections.
2//!
3//! This module defines a protocol for capturing and replaying differential collections
4//! to streaming storage that may both duplicate and reorder messages. It records facts
5//! about the collection that once true stay true, such as the exact changes data undergo
6//! at each time, and the number of distinct updates at each time.
7//!
8//! The methods are parameterized by implementors of byte sources and byte sinks. For
9//! example implementations of these traits, consult the commented text at the end of
10//! this file.
11
12use std::time::Duration;
13use abomonation_derive::Abomonation;
14use serde::{Deserialize, Serialize};
15
16/// A message in the CDC V2 protocol.
17#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
18pub enum Message<D, T, R> {
19    /// A batch of updates that are certain to occur.
20    ///
21    /// Each triple is an irrevocable statement about a change that occurs.
22    /// Each statement contains a datum, a time, and a difference, and asserts
23    /// that the multiplicity of the datum changes at the time by the difference.
24    Updates(Vec<(D, T, R)>),
25    /// An irrevocable statement about the number of updates within a time interval.
26    Progress(Progress<T>),
27}
28
29/// An irrevocable statement about the number of updates at times within an interval.
30///
31/// This statement covers all times beyond `lower` and not beyond `upper`.
32/// Each element of `counts` is an irrevocable statement about the exact number of
33/// distinct updates that occur at that time.
34/// Times not present in `counts` have a count of zero.
35#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
36pub struct Progress<T> {
37    /// The lower bound of times contained in this statement.
38    pub lower: Vec<T>,
39    /// The upper bound of times contained in this statement.
40    pub upper: Vec<T>,
41    /// All non-zero counts for times beyond `lower` and not beyond `upper`.
42    pub counts: Vec<(T, usize)>,
43}
44
45/// An iterator that yields with a `None` every so often.
46pub struct YieldingIter<I> {
47    /// When set, a time after which we should return `None`.
48    start: Option<std::time::Instant>,
49    after: std::time::Duration,
50    iter: I,
51}
52
53impl<I> YieldingIter<I> {
54    /// Construct a yielding iterator from an inter-yield duration.
55    pub fn new_from(iter: I, yield_after: std::time::Duration) -> Self {
56        Self {
57            start: None,
58            after: yield_after,
59            iter,
60        }
61    }
62}
63
64impl<I: Iterator> Iterator for YieldingIter<I> {
65    type Item = I::Item;
66    fn next(&mut self) -> Option<Self::Item> {
67        if self.start.is_none() {
68            self.start = Some(std::time::Instant::now());
69        }
70        let start = self.start.as_ref().unwrap();
71        if start.elapsed() > self.after {
72            self.start = None;
73            None
74        } else {
75            match self.iter.next() {
76                Some(x) => Some(x),
77                None => {
78                    self.start = None;
79                    None
80                }
81            }
82        }
83    }
84}
85
86/// A simple sink for byte slices.
87pub trait Writer<T> {
88    /// Returns an amount of time to wait before retrying, or `None` for success.
89    fn poll(&mut self, item: &T) -> Option<Duration>;
90    /// Indicates if the sink has committed all sent data and can be safely dropped.
91    fn done(&self) -> bool;
92}
93
94/// A deduplicating, re-ordering iterator.
95pub mod iterator {
96
97    use super::{Message, Progress};
98    use crate::lattice::Lattice;
99    use std::hash::Hash;
100    use timely::order::PartialOrder;
101    use timely::progress::{
102        frontier::{AntichainRef, MutableAntichain},
103        Antichain,
104        Timestamp,
105    };
106
107    /// A direct implementation of a deduplicating, re-ordering iterator.
108    ///
109    /// The iterator draws from a source that may have arbitrary duplication, be arbitrarily out of order,
110    /// and yet produces each update once, with in-order batches. The iterator maintains a bounded memory
111    /// footprint, proportional to the mismatch between the received updates and progress messages.
112    pub struct Iter<I, D, T, R>
113    where
114        I: Iterator<Item = Message<D, T, R>>,
115        T: Hash + Ord + Lattice + Clone,
116        D: Hash + Eq,
117        T: Hash + Eq,
118        R: Hash + Eq,
119    {
120        /// Source of potentially duplicated, out of order cdc_v2 messages.
121        iterator: I,
122        /// Updates that have been received, but are still beyond `reported_frontier`.
123        ///
124        /// These updates are retained both so that they can eventually be transmitted,
125        /// but also so that they can deduplicate updates that may still be received.
126        updates: std::collections::HashSet<(D, T, R)>,
127        /// Frontier through which the iterator has reported updates.
128        ///
129        /// All updates not beyond this frontier have been reported.
130        /// Any information related to times not beyond this frontier can be discarded.
131        ///
132        /// This frontier tracks the meet of `progress_frontier` and `messages_frontier`,
133        /// our two bounds on potential uncertainty in progress and update messages.
134        reported_frontier: Antichain<T>,
135        /// Frontier of accepted progress statements.
136        ///
137        /// All progress message counts for times not beyond this frontier have been
138        /// incorporated in to `messages_frontier`. This frontier also guides which
139        /// received progress statements can be incorporated: those whose for which
140        /// this frontier is beyond their lower bound.
141        progress_frontier: Antichain<T>,
142        /// Counts of outstanding messages at times.
143        ///
144        /// These counts track the difference between message counts at times announced
145        /// by progress messages, and message counts at times received in distinct updates.
146        messages_frontier: MutableAntichain<T>,
147        /// Progress statements that are not yet actionable due to out-of-Iterness.
148        ///
149        /// A progress statement becomes actionable once the progress frontier is beyond
150        /// its lower frontier. This ensures that the [0, lower) interval is already
151        /// incorporated, and that we will not leave a gap by incorporating the counts
152        /// and reflecting the progress statement's upper frontier.
153        progress_queue: Vec<Progress<T>>,
154    }
155
156    impl<D, T, R, I> Iterator for Iter<I, D, T, R>
157    where
158        I: Iterator<Item = Message<D, T, R>>,
159        T: Hash + Ord + Lattice + Clone,
160        D: Hash + Eq + Clone,
161        R: Hash + Eq + Clone,
162    {
163        type Item = (Vec<(D, T, R)>, Antichain<T>);
164        fn next(&mut self) -> Option<Self::Item> {
165            // Each call to `next` should return some newly carved interval of time.
166            // As such, we should read from our source until we find such a thing.
167            //
168            // An interval can be completed once our frontier of received progress
169            // information and our frontier of unresolved counts have advanced.
170            while let Some(message) = self.iterator.next() {
171                match message {
172                    Message::Updates(mut updates) => {
173                        // Discard updates at reported times, or duplicates at unreported times.
174                        updates.retain(|dtr| {
175                            self.reported_frontier.less_equal(&dtr.1) && !self.updates.contains(dtr)
176                        });
177                        // Decrement our counts of accounted-for messages.
178                        self.messages_frontier
179                            .update_iter(updates.iter().map(|(_, t, _)| (t.clone(), -1)));
180                        // Record the messages in our de-duplication collection.
181                        self.updates.extend(updates.into_iter());
182                    }
183                    Message::Progress(progress) => {
184                        // A progress statement may not be immediately actionable.
185                        self.progress_queue.push(progress);
186                    }
187                }
188
189                // Attempt to drain actionable progress messages.
190                // A progress message is actionable if `self.progress_frontier` is greater or
191                // equal to the message's lower bound.
192                while let Some(position) = self.progress_queue.iter().position(|p| {
193                    <_ as PartialOrder>::less_equal(
194                        &AntichainRef::new(&p.lower),
195                        &self.progress_frontier.borrow(),
196                    )
197                }) {
198                    let mut progress = self.progress_queue.remove(position);
199                    // Discard counts that have already been incorporated.
200                    progress
201                        .counts
202                        .retain(|(time, _count)| self.progress_frontier.less_equal(time));
203                    // Record any new reports of expected counts.
204                    self.messages_frontier
205                        .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
206                    // Extend the frontier to be times greater or equal to both progress.upper and self.progress_frontier.
207                    let mut new_frontier = Antichain::new();
208                    for time1 in progress.upper {
209                        for time2 in self.progress_frontier.elements() {
210                            new_frontier.insert(time1.join(time2));
211                        }
212                    }
213                    self.progress_queue.retain(|p| {
214                        !<_ as PartialOrder>::less_equal(
215                            &AntichainRef::new(&p.upper),
216                            &new_frontier.borrow(),
217                        )
218                    });
219                    self.progress_frontier = new_frontier;
220                }
221
222                // Now check and see if our lower bound exceeds `self.reported_frontier`.
223                let mut lower_bound = self.progress_frontier.clone();
224                lower_bound.extend(self.messages_frontier.frontier().iter().cloned());
225                if lower_bound != self.reported_frontier {
226                    let to_publish = self
227                        .updates
228                        .iter()
229                        .filter(|(_, t, _)| !lower_bound.less_equal(t))
230                        .cloned()
231                        .collect::<Vec<_>>();
232                    self.updates.retain(|(_, t, _)| lower_bound.less_equal(t));
233                    self.reported_frontier = lower_bound.clone();
234                    return Some((to_publish, lower_bound));
235                }
236            }
237            None
238        }
239    }
240
241    impl<D, T, R, I> Iter<I, D, T, R>
242    where
243        I: Iterator<Item = Message<D, T, R>>,
244        T: Hash + Ord + Lattice + Clone + Timestamp,
245        D: Hash + Eq + Clone,
246        R: Hash + Eq + Clone,
247    {
248        /// Construct a new re-ordering, deduplicating iterator.
249        pub fn new(iterator: I) -> Self {
250            Self {
251                iterator,
252                updates: std::collections::HashSet::new(),
253                reported_frontier: Antichain::from_elem(T::minimum()),
254                progress_frontier: Antichain::from_elem(T::minimum()),
255                messages_frontier: MutableAntichain::new(),
256                progress_queue: Vec::new(),
257            }
258        }
259    }
260}
261
262/// Methods for recovering update streams from binary bundles.
263pub mod source {
264
265    use super::{Message, Progress};
266    use crate::{lattice::Lattice, ExchangeData};
267    use std::cell::RefCell;
268    use std::hash::Hash;
269    use std::rc::Rc;
270    use std::marker::{Send, Sync};
271    use std::sync::Arc;
272    use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
273    use timely::progress::Timestamp;
274    use timely::scheduling::SyncActivator;
275
276    // TODO(guswynn): implement this generally in timely
277    struct DropActivator {
278        activator: Arc<SyncActivator>,
279    }
280
281    impl Drop for DropActivator {
282        fn drop(&mut self) {
283            // Best effort: failure to activate
284            // is ignored
285            let _ = self.activator.activate();
286        }
287    }
288
289    /// Constructs a stream of updates from a source of messages.
290    ///
291    /// The stream is built in the supplied `scope` and continues to run until
292    /// the returned `Box<Any>` token is dropped. The `source_builder` argument
293    /// is invoked with a `SyncActivator` that will re-activate the source.
294    pub fn build<G, B, I, D, T, R>(
295        scope: G,
296        source_builder: B,
297    ) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
298    where
299        G: Scope<Timestamp = T>,
300        B: FnOnce(SyncActivator) -> I,
301        I: Iterator<Item = Message<D, T, R>> + 'static,
302        D: ExchangeData + Hash,
303        T: ExchangeData + Hash + Timestamp + Lattice,
304        R: ExchangeData + Hash,
305    {
306        // Read messages are either updates or progress messages.
307        // Each may contain duplicates, and we must take care to deduplicate information before introducing it to an accumulation.
308        // This includes both emitting updates, and setting expectations for update counts.
309        //
310        // Updates need to be deduplicated by (data, time), and we should exchange them by such.
311        // Progress needs to be deduplicated by time, and we should exchange them by such.
312        //
313        // The first cut of this is a dataflow graph that looks like (flowing downward)
314        //
315        // 1. MESSAGES:
316        //      Reads `Message` stream; maintains capabilities.
317        //      Sends `Updates` to UPDATES stage by hash((data, time, diff)).
318        //      Sends `Progress` to PROGRESS stage by hash(time), each with lower, upper bounds.
319        //      Shares capabilities with downstream operator.
320        // 2. UPDATES:
321        //      Maintains and deduplicates updates.
322        //      Ships updates once frontier advances.
323        //      Ships counts to PROGRESS stage, by hash(time).
324        // 3. PROGRESS:
325        //      Maintains outstanding message counts by time. Tracks frontiers.
326        //      Tracks lower bounds of messages and progress frontier. Broadcasts changes to FEEDBACK stage
327        // 4. FEEDBACK:
328        //      Shares capabilities with MESSAGES; downgrades to track input from PROGRESS.
329        //
330        // Each of these stages can be arbitrarily data-parallel, and FEEDBACK *must* have the same parallelism as RAW.
331        // Limitations: MESSAGES must broadcast lower and upper bounds to PROGRESS and PROGRESS must broadcast its changes
332        // to FEEDBACK. This may mean that scaling up PROGRESS could introduce quadratic problems. Though, both of these
333        // broadcast things are meant to be very reduced data.
334
335        use crate::hashable::Hashable;
336        use timely::dataflow::channels::pact::Exchange;
337        use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
338        use timely::progress::frontier::MutableAntichain;
339        use timely::progress::ChangeBatch;
340
341        // Some message distribution logic depends on the number of workers.
342        let workers = scope.peers();
343
344        let mut token = None;
345        // Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
346        let mut antichain = MutableAntichain::new();
347        antichain.update_iter(Some((T::minimum(), workers as i64)));
348        let shared_frontier = Rc::new(RefCell::new(antichain));
349        let shared_frontier2 = shared_frontier.clone();
350
351        // Step 1: The MESSAGES operator.
352        let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
353        let address = messages_op.operator_info().address;
354        let activator = scope.sync_activator_for(&address);
355        let activator2 = scope.activator_for(&address);
356        let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(&address)) };
357        let mut source = source_builder(activator);
358        let (mut updates_out, updates) = messages_op.new_output();
359        let (mut progress_out, progress) = messages_op.new_output();
360        messages_op.build(|capabilities| {
361
362            // A Weak that communicates whether the returned token has been dropped.
363            let drop_activator_weak = Arc::downgrade(&drop_activator.activator);
364
365            token = Some(drop_activator);
366
367            // Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
368            let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
369            let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
370            // Capture the shared frontier to read out frontier updates to apply.
371            let local_frontier = shared_frontier.clone();
372            //
373            move |_frontiers| {
374                // First check to ensure that we haven't been terminated by someone dropping our tokens.
375                if drop_activator_weak.upgrade().is_none() {
376                    // Give up our capabilities
377                    updates_caps.downgrade(&[]);
378                    progress_caps.downgrade(&[]);
379                    // never continue, even if we are (erroneously) activated again.
380                    return;
381                }
382
383                // Consult our shared frontier, and ensure capabilities are downgraded to it.
384                let shared_frontier = local_frontier.borrow();
385                updates_caps.downgrade(&shared_frontier.frontier());
386                progress_caps.downgrade(&shared_frontier.frontier());
387
388                // Next check to see if we have been terminated by the source being complete.
389                if !updates_caps.is_empty() && !progress_caps.is_empty() {
390                    let mut updates = updates_out.activate();
391                    let mut progress = progress_out.activate();
392
393                    // TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
394                    // Specifically, there may not be one capability valid for all updates.
395                    let mut updates_session = updates.session(&updates_caps[0]);
396                    let mut progress_session = progress.session(&progress_caps[0]);
397
398                    // We presume the iterator will yield if appropriate.
399                    for message in source.by_ref() {
400                        match message {
401                            Message::Updates(mut updates) => {
402                                updates_session.give_vec(&mut updates);
403                            }
404                            Message::Progress(progress) => {
405                                // We must send a copy of each progress message to all workers,
406                                // but we can partition the counts across workers by timestamp.
407                                let mut to_worker = vec![Vec::new(); workers];
408                                for (time, count) in progress.counts {
409                                    to_worker[(time.hashed() as usize) % workers]
410                                        .push((time, count));
411                                }
412                                for (worker, counts) in to_worker.into_iter().enumerate() {
413                                    progress_session.give((
414                                        worker,
415                                        Progress {
416                                            lower: progress.lower.clone(),
417                                            upper: progress.upper.clone(),
418                                            counts,
419                                        },
420                                    ));
421                                }
422                            }
423                        }
424                    }
425                }
426            }
427        });
428
429        // Step 2: The UPDATES operator.
430        let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone());
431        let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
432        let (mut changes_out, changes) = updates_op.new_output();
433        let (mut counts_out, counts) = updates_op.new_output();
434        updates_op.build(move |_capability| {
435            // Deduplicates updates, and ships novel updates and the counts for each time.
436            // For simplicity, this operator ships updates as they are discovered to be new.
437            // This has the defect that on load we may have two copies of the data (shipped,
438            // and here for deduplication).
439            //
440            // Filters may be pushed ahead of this operator, but because of deduplication we
441            // may not push projections ahead of this operator (at least, not without fields
442            // that are known to form keys, and even then only carefully).
443            let mut pending = std::collections::HashMap::new();
444            let mut change_batch = ChangeBatch::<T>::new();
445            move |frontiers| {
446                // Thin out deduplication buffer.
447                // This is the moment in a more advanced implementation where we might send
448                // the data for the first time, maintaining only one copy of each update live
449                // at a time in the system.
450                pending.retain(|(_row, time), _diff| frontiers[0].less_equal(time));
451
452                // Deduplicate newly received updates, sending new updates and timestamp counts.
453                let mut changes = changes_out.activate();
454                let mut counts = counts_out.activate();
455                while let Some((capability, updates)) = input.next() {
456                    let mut changes_session = changes.session(&capability);
457                    let mut counts_session = counts.session(&capability);
458                    for (data, time, diff) in updates.iter() {
459                        if frontiers[0].less_equal(time) {
460                            if let Some(prior) = pending.insert((data.clone(), time.clone()), diff.clone()) {
461                                assert_eq!(&prior, diff);
462                            } else {
463                                change_batch.update(time.clone(), -1);
464                                changes_session.give((data.clone(), time.clone(), diff.clone()));
465                            }
466                        }
467                    }
468                    if !change_batch.is_empty() {
469                        counts_session.give_iterator(change_batch.drain());
470                    }
471                }
472            }
473        });
474
475        // Step 3: The PROGRESS operator.
476        let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope.clone());
477        let mut input = progress_op.new_input(
478            &progress,
479            Exchange::new(|x: &(usize, Progress<T>)| x.0 as u64),
480        );
481        let mut counts =
482            progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
483        let (mut frontier_out, frontier) = progress_op.new_output();
484        progress_op.build(move |_capability| {
485            // Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.
486
487            use timely::order::PartialOrder;
488            use timely::progress::{frontier::AntichainRef, Antichain};
489
490            let mut progress_queue = Vec::new();
491            let mut progress_frontier = Antichain::from_elem(T::minimum());
492            let mut updates_frontier = MutableAntichain::new();
493            let mut reported_frontier = Antichain::from_elem(T::minimum());
494
495            move |_frontiers| {
496                let mut frontier = frontier_out.activate();
497
498                // If the frontier changes we need a capability to express that.
499                // Any capability should work; the downstream listener doesn't care.
500                let mut capability: Option<Capability<T>> = None;
501
502                // Drain all relevant update counts in to the mutable antichain tracking its frontier.
503                while let Some((cap, counts)) = counts.next() {
504                    updates_frontier.update_iter(counts.iter().cloned());
505                    capability = Some(cap.retain());
506                }
507                // Drain all progress statements into the queue out of which we will work.
508                while let Some((cap, progress)) = input.next() {
509                    progress_queue.extend(progress.iter().map(|x| (x.1).clone()));
510                    capability = Some(cap.retain());
511                }
512
513                // Extract and act on actionable progress messages.
514                // A progress message is actionable if `self.progress_frontier` is beyond the message's lower bound.
515                while let Some(position) = progress_queue.iter().position(|p| {
516                    <_ as PartialOrder>::less_equal(
517                        &AntichainRef::new(&p.lower),
518                        &progress_frontier.borrow(),
519                    )
520                }) {
521                    // Extract progress statement.
522                    let mut progress = progress_queue.remove(position);
523                    // Discard counts that have already been incorporated.
524                    progress
525                        .counts
526                        .retain(|(time, _count)| progress_frontier.less_equal(time));
527                    // Record any new reports of expected counts.
528                    updates_frontier
529                        .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
530                    // Extend self.progress_frontier by progress.upper.
531                    let mut new_frontier = Antichain::new();
532                    for time1 in progress.upper {
533                        for time2 in progress_frontier.elements() {
534                            new_frontier.insert(time1.join(time2));
535                        }
536                    }
537                    progress_frontier = new_frontier;
538                }
539
540                // Determine if the lower bound of frontiers have advanced, and transmit updates if so.
541                let mut lower_bound = progress_frontier.clone();
542                lower_bound.extend(updates_frontier.frontier().iter().cloned());
543                if lower_bound != reported_frontier {
544                    let capability =
545                        capability.expect("Changes occurred, without surfacing a capability");
546                    let mut changes = ChangeBatch::new();
547                    changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1)));
548                    changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1)));
549                    let mut frontier_session = frontier.session(&capability);
550                    for peer in 0..workers {
551                        frontier_session.give((peer, changes.clone()));
552                    }
553                    reported_frontier = lower_bound.clone();
554                }
555            }
556        });
557
558        // Step 4: The FEEDBACK operator.
559        let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope.clone());
560        let mut input = feedback_op.new_input(
561            &frontier,
562            Exchange::new(|x: &(usize, ChangeBatch<T>)| x.0 as u64),
563        );
564        feedback_op.build(move |_capability| {
565            // Receive frontier changes and share the net result with MESSAGES.
566            move |_frontiers| {
567                let mut antichain = shared_frontier2.borrow_mut();
568                let mut must_activate = false;
569                while let Some((_cap, frontier_changes)) = input.next() {
570                    for (_self, input_changes) in frontier_changes.iter() {
571                        // Apply the updates, and observe if the lower bound has changed.
572                        if antichain.update_iter(input_changes.unstable_internal_updates().iter().cloned()).next().is_some() {
573                            must_activate = true;
574                        }
575                    }
576                }
577                // If the lower bound has changed, we must activate MESSAGES.
578                if must_activate { activator2.activate(); }
579            }
580        });
581
582        (Box::new(token.unwrap()), changes)
583    }
584}
585
586/// Methods for recording update streams to binary bundles.
587pub mod sink {
588
589    use std::hash::Hash;
590    use std::cell::RefCell;
591    use std::rc::Weak;
592
593    use serde::{Deserialize, Serialize};
594
595    use timely::order::PartialOrder;
596    use timely::progress::{Antichain, ChangeBatch, Timestamp};
597    use timely::dataflow::{Scope, Stream};
598    use timely::dataflow::channels::pact::{Exchange, Pipeline};
599    use timely::dataflow::operators::generic::{FrontieredInputHandle, builder_rc::OperatorBuilder};
600
601    use crate::{lattice::Lattice, ExchangeData};
602    use super::{Writer, Message, Progress};
603
604    /// Constructs a sink, for recording the updates in `stream`.
605    ///
606    /// It is crucial that `stream` has been consolidated before this method, which
607    /// will *not* perform the consolidation on the stream's behalf. If this is not
608    /// performed before calling the method, the recorded output may not be correctly
609    /// reconstructed by readers.
610    pub fn build<G, BS, D, T, R>(
611        stream: &Stream<G, (D, T, R)>,
612        sink_hash: u64,
613        updates_sink: Weak<RefCell<BS>>,
614        progress_sink: Weak<RefCell<BS>>,
615    ) where
616        G: Scope<Timestamp = T>,
617        BS: Writer<Message<D,T,R>> + 'static,
618        D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
619        T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
620        R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
621    {
622        // First we record the updates that stream in.
623        // We can simply record all updates, under the presumption that the have been consolidated
624        // and so any record we see is in fact guaranteed to happen.
625        let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
626        let reactivator = stream.scope().activator_for(&builder.operator_info().address);
627        let mut input = builder.new_input(stream, Pipeline);
628        let (mut updates_out, updates) = builder.new_output();
629
630        builder.build_reschedule(
631            move |_capability| {
632                let mut timestamps = ChangeBatch::new();
633                let mut send_queue = std::collections::VecDeque::new();
634                move |_frontiers| {
635                    let mut output = updates_out.activate();
636
637                    // We want to drain inputs always...
638                    input.for_each(|capability, updates| {
639                        // Write each update out, and record the timestamp.
640                        for (_data, time, _diff) in updates.iter() {
641                            timestamps.update(time.clone(), 1);
642                        }
643
644                        // Now record the update to the writer.
645                        send_queue.push_back(Message::Updates(updates.replace(Vec::new())));
646
647                        // Transmit timestamp counts downstream.
648                        output
649                            .session(&capability)
650                            .give_iterator(timestamps.drain());
651                    });
652
653                    // Drain whatever we can from the queue of bytes to send.
654                    // ... but needn't do anything more if our sink is closed.
655                    if let Some(sink) = updates_sink.upgrade() {
656                        let mut sink = sink.borrow_mut();
657                        while let Some(message) = send_queue.front() {
658                            if let Some(duration) = sink.poll(message) {
659                                // Reschedule after `duration` and then bail.
660                                reactivator.activate_after(duration);
661                                return true;
662                            } else {
663                                send_queue.pop_front();
664                            }
665                        }
666                        // Signal incompleteness if messages remain to be sent.
667                        !sink.done() || !send_queue.is_empty()
668                    } else {
669                        // We have been terminated, but may still receive indefinite data.
670                        send_queue.clear();
671                        // Signal that there are no outstanding writes.
672                        false
673                    }
674                }
675            },
676        );
677
678        // We use a lower-level builder here to get access to the operator address, for rescheduling.
679        let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
680        let reactivator = stream.scope().activator_for(&builder.operator_info().address);
681        let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
682        let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();
683
684        // We now record the numbers of updates at each timestamp between lower and upper bounds.
685        // Track the advancing frontier, to know when to produce utterances.
686        let mut frontier = Antichain::from_elem(T::minimum());
687        // Track accumulated counts for timestamps.
688        let mut timestamps = ChangeBatch::new();
689        // Stash for serialized data yet to send.
690        let mut send_queue = std::collections::VecDeque::new();
691        let mut retain = Vec::new();
692
693        builder.build_reschedule(|_capabilities| {
694            move |frontiers| {
695                let mut input = FrontieredInputHandle::new(&mut input, &frontiers[0]);
696
697                // We want to drain inputs no matter what.
698                // We could do this after the next step, as we are certain these timestamps will
699                // not be part of a closed frontier (as they have not yet been read). This has the
700                // potential to make things speedier as we scan less and keep a smaller footprint.
701                input.for_each(|_capability, counts| {
702                    timestamps.extend(counts.iter().cloned());
703                });
704
705                if should_write {
706                    if let Some(sink) = progress_sink.upgrade() {
707                        let mut sink = sink.borrow_mut();
708
709                        // If our frontier advances strictly, we have the opportunity to issue a progress statement.
710                        if <_ as PartialOrder>::less_than(
711                            &frontier.borrow(),
712                            &input.frontier.frontier(),
713                        ) {
714                            let new_frontier = input.frontier.frontier();
715
716                            // Extract the timestamp counts to announce.
717                            let mut announce = Vec::new();
718                            for (time, count) in timestamps.drain() {
719                                if !new_frontier.less_equal(&time) {
720                                    announce.push((time, count as usize));
721                                } else {
722                                    retain.push((time, count));
723                                }
724                            }
725                            timestamps.extend(retain.drain(..));
726
727                            // Announce the lower bound, upper bound, and timestamp counts.
728                            let progress = Progress {
729                                lower: frontier.elements().to_vec(),
730                                upper: new_frontier.to_vec(),
731                                counts: announce,
732                            };
733                            send_queue.push_back(Message::Progress(progress));
734
735                            // Advance our frontier to track our progress utterance.
736                            frontier = input.frontier.frontier().to_owned();
737
738                            while let Some(message) = send_queue.front() {
739                                if let Some(duration) = sink.poll(message) {
740                                    // Reschedule after `duration` and then bail.
741                                    reactivator.activate_after(duration);
742                                    // Signal that work remains to be done.
743                                    return true;
744                                } else {
745                                    send_queue.pop_front();
746                                }
747                            }
748                        }
749                        // Signal incompleteness if messages remain to be sent.
750                        !sink.done() || !send_queue.is_empty()
751                    } else {
752                        timestamps.clear();
753                        send_queue.clear();
754                        // Signal that there are no outstanding writes.
755                        false
756                    }
757                } else { false }
758            }
759        });
760    }
761}
762
763// pub mod kafka {
764
765//     use serde::{Serialize, Deserialize};
766//     use timely::scheduling::SyncActivator;
767//     use rdkafka::{ClientContext, config::ClientConfig};
768//     use rdkafka::consumer::{BaseConsumer, ConsumerContext};
769//     use rdkafka::error::{KafkaError, RDKafkaError};
770//     use super::BytesSink;
771
772//     use std::hash::Hash;
773//     use timely::progress::Timestamp;
774//     use timely::dataflow::{Scope, Stream};
775//     use crate::ExchangeData;
776//     use crate::lattice::Lattice;
777
778//     /// Creates a Kafka source from supplied configuration information.
779//     pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
780//     where
781//         G: Scope<Timestamp = T>,
782//         D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
783//         T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice,
784//         R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
785//     {
786//         super::source::build(scope, |activator| {
787//             let source = KafkaSource::new(addr, topic, group, activator);
788//             super::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
789//         })
790//     }
791
792//     pub fn create_sink<G, D, T, R>(stream: &Stream<G, (D, T, R)>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
793//     where
794//         G: Scope<Timestamp = T>,
795//         D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
796//         T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
797//         R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
798//     {
799//         use std::rc::Rc;
800//         use std::cell::RefCell;
801//         use crate::hashable::Hashable;
802
803//         let sink = KafkaSink::new(addr, topic);
804//         let result = Rc::new(RefCell::new(sink));
805//         let sink_hash = (addr.to_string(), topic.to_string()).hashed();
806//         super::sink::build(
807//             &stream,
808//             sink_hash,
809//             Rc::downgrade(&result),
810//             Rc::downgrade(&result),
811//         );
812//         Box::new(result)
813
814//     }
815
816//     pub struct KafkaSource {
817//         consumer: BaseConsumer<ActivationConsumerContext>,
818//     }
819
820//     impl KafkaSource {
821//         pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self {
822//             let mut kafka_config = ClientConfig::new();
823//             // This is mostly cargo-cult'd in from `source/kafka.rs`.
824//             kafka_config.set("bootstrap.servers", &addr.to_string());
825//             kafka_config
826//                 .set("enable.auto.commit", "false")
827//                 .set("auto.offset.reset", "earliest");
828
829//             kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); // 30 seconds
830//             kafka_config.set("fetch.message.max.bytes", "134217728");
831//             kafka_config.set("group.id", group);
832//             kafka_config.set("isolation.level", "read_committed");
833//             let activator = ActivationConsumerContext(activator);
834//             let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap();
835//             use rdkafka::consumer::Consumer;
836//             consumer.subscribe(&[topic]).unwrap();
837//             Self {
838//                 consumer,
839//             }
840//         }
841//     }
842
843//     pub struct Iter<D, T, R> {
844//         pub source: KafkaSource,
845//         phantom: std::marker::PhantomData<(D, T, R)>,
846//     }
847
848//     impl<D, T, R> Iter<D, T, R> {
849//         /// Constructs a new iterator from a bytes source.
850//         pub fn new_from(source: KafkaSource) -> Self {
851//             Self {
852//                 source,
853//                 phantom: std::marker::PhantomData,
854//             }
855//         }
856//     }
857
858//     impl<D, T, R> Iterator for Iter<D, T, R>
859//     where
860//         D: for<'a>Deserialize<'a>,
861//         T: for<'a>Deserialize<'a>,
862//         R: for<'a>Deserialize<'a>,
863//     {
864//         type Item = super::Message<D, T, R>;
865//         fn next(&mut self) -> Option<Self::Item> {
866//             use rdkafka::message::Message;
867//             self.source
868//                 .consumer
869//                 .poll(std::time::Duration::from_millis(0))
870//                 .and_then(|result| result.ok())
871//                 .and_then(|message| {
872//                     message.payload().and_then(|message| bincode::deserialize::<super::Message<D, T, R>>(message).ok())
873//                 })
874//         }
875//     }
876
877//     /// An implementation of [`ConsumerContext`] that unparks the wrapped thread
878//     /// when the message queue switches from nonempty to empty.
879//     struct ActivationConsumerContext(SyncActivator);
880
881//     impl ClientContext for ActivationConsumerContext { }
882
883//     impl ActivationConsumerContext {
884//         fn activate(&self) {
885//             self.0.activate().unwrap();
886//         }
887//     }
888
889//     impl ConsumerContext for ActivationConsumerContext {
890//         fn message_queue_nonempty_callback(&self) {
891//             self.activate();
892//         }
893//     }
894
895//     use std::time::Duration;
896//     use rdkafka::producer::DefaultProducerContext;
897//     use rdkafka::producer::{BaseRecord, ThreadedProducer};
898
899//     pub struct KafkaSink {
900//         topic: String,
901//         producer: ThreadedProducer<DefaultProducerContext>,
902//     }
903
904//     impl KafkaSink {
905//         pub fn new(addr: &str, topic: &str) -> Self {
906//             let mut config = ClientConfig::new();
907//             config.set("bootstrap.servers", &addr);
908//             config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20));
909//             config.set("queue.buffering.max.messages", &format!("{}", 10_000_000));
910//             config.set("queue.buffering.max.ms", &format!("{}", 10));
911//             let producer = config
912//                 .create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext)
913//                 .expect("creating kafka producer for kafka sinks failed");
914//             Self {
915//                 producer,
916//                 topic: topic.to_string(),
917//             }
918//         }
919//     }
920
921//     impl BytesSink for KafkaSink {
922//         fn poll(&mut self, bytes: &[u8]) -> Option<Duration> {
923//             let record = BaseRecord::<[u8], _>::to(&self.topic).payload(bytes);
924
925//             self.producer.send(record).err().map(|(e, _)| {
926//                 if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e {
927//                     Duration::from_secs(1)
928//                 } else {
929//                     // TODO(frank): report this error upwards so the user knows the sink is dead.
930//                     Duration::from_secs(1)
931//                 }
932//             })
933//         }
934//         fn done(&self) -> bool {
935//             self.producer.in_flight_count() == 0
936//         }
937//     }
938
939// }