differential_dataflow/capture.rs
1//! Logic related to capture and replay of differential collections.
2//!
3//! This module defines a protocol for capturing and replaying differential collections
4//! to streaming storage that may both duplicate and reorder messages. It records facts
5//! about the collection that once true stay true, such as the exact changes data undergo
6//! at each time, and the number of distinct updates at each time.
7//!
8//! The methods are parameterized by implementors of byte sources and byte sinks. For
9//! example implementations of these traits, consult the commented text at the end of
10//! this file.
11
12use std::time::Duration;
13use abomonation_derive::Abomonation;
14use serde::{Deserialize, Serialize};
15
16/// A message in the CDC V2 protocol.
17#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
18pub enum Message<D, T, R> {
19 /// A batch of updates that are certain to occur.
20 ///
21 /// Each triple is an irrevocable statement about a change that occurs.
22 /// Each statement contains a datum, a time, and a difference, and asserts
23 /// that the multiplicity of the datum changes at the time by the difference.
24 Updates(Vec<(D, T, R)>),
25 /// An irrevocable statement about the number of updates within a time interval.
26 Progress(Progress<T>),
27}
28
29/// An irrevocable statement about the number of updates at times within an interval.
30///
31/// This statement covers all times beyond `lower` and not beyond `upper`.
32/// Each element of `counts` is an irrevocable statement about the exact number of
33/// distinct updates that occur at that time.
34/// Times not present in `counts` have a count of zero.
35#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
36pub struct Progress<T> {
37 /// The lower bound of times contained in this statement.
38 pub lower: Vec<T>,
39 /// The upper bound of times contained in this statement.
40 pub upper: Vec<T>,
41 /// All non-zero counts for times beyond `lower` and not beyond `upper`.
42 pub counts: Vec<(T, usize)>,
43}
44
45/// An iterator that yields with a `None` every so often.
46pub struct YieldingIter<I> {
47 /// When set, a time after which we should return `None`.
48 start: Option<std::time::Instant>,
49 after: std::time::Duration,
50 iter: I,
51}
52
53impl<I> YieldingIter<I> {
54 /// Construct a yielding iterator from an inter-yield duration.
55 pub fn new_from(iter: I, yield_after: std::time::Duration) -> Self {
56 Self {
57 start: None,
58 after: yield_after,
59 iter,
60 }
61 }
62}
63
64impl<I: Iterator> Iterator for YieldingIter<I> {
65 type Item = I::Item;
66 fn next(&mut self) -> Option<Self::Item> {
67 if self.start.is_none() {
68 self.start = Some(std::time::Instant::now());
69 }
70 let start = self.start.as_ref().unwrap();
71 if start.elapsed() > self.after {
72 self.start = None;
73 None
74 } else {
75 match self.iter.next() {
76 Some(x) => Some(x),
77 None => {
78 self.start = None;
79 None
80 }
81 }
82 }
83 }
84}
85
86/// A simple sink for byte slices.
87pub trait Writer<T> {
88 /// Returns an amount of time to wait before retrying, or `None` for success.
89 fn poll(&mut self, item: &T) -> Option<Duration>;
90 /// Indicates if the sink has committed all sent data and can be safely dropped.
91 fn done(&self) -> bool;
92}
93
94/// A deduplicating, re-ordering iterator.
95pub mod iterator {
96
97 use super::{Message, Progress};
98 use crate::lattice::Lattice;
99 use std::hash::Hash;
100 use timely::order::PartialOrder;
101 use timely::progress::{
102 frontier::{AntichainRef, MutableAntichain},
103 Antichain,
104 Timestamp,
105 };
106
107 /// A direct implementation of a deduplicating, re-ordering iterator.
108 ///
109 /// The iterator draws from a source that may have arbitrary duplication, be arbitrarily out of order,
110 /// and yet produces each update once, with in-order batches. The iterator maintains a bounded memory
111 /// footprint, proportional to the mismatch between the received updates and progress messages.
112 pub struct Iter<I, D, T, R>
113 where
114 I: Iterator<Item = Message<D, T, R>>,
115 T: Hash + Ord + Lattice + Clone,
116 D: Hash + Eq,
117 T: Hash + Eq,
118 R: Hash + Eq,
119 {
120 /// Source of potentially duplicated, out of order cdc_v2 messages.
121 iterator: I,
122 /// Updates that have been received, but are still beyond `reported_frontier`.
123 ///
124 /// These updates are retained both so that they can eventually be transmitted,
125 /// but also so that they can deduplicate updates that may still be received.
126 updates: std::collections::HashSet<(D, T, R)>,
127 /// Frontier through which the iterator has reported updates.
128 ///
129 /// All updates not beyond this frontier have been reported.
130 /// Any information related to times not beyond this frontier can be discarded.
131 ///
132 /// This frontier tracks the meet of `progress_frontier` and `messages_frontier`,
133 /// our two bounds on potential uncertainty in progress and update messages.
134 reported_frontier: Antichain<T>,
135 /// Frontier of accepted progress statements.
136 ///
137 /// All progress message counts for times not beyond this frontier have been
138 /// incorporated in to `messages_frontier`. This frontier also guides which
139 /// received progress statements can be incorporated: those whose for which
140 /// this frontier is beyond their lower bound.
141 progress_frontier: Antichain<T>,
142 /// Counts of outstanding messages at times.
143 ///
144 /// These counts track the difference between message counts at times announced
145 /// by progress messages, and message counts at times received in distinct updates.
146 messages_frontier: MutableAntichain<T>,
147 /// Progress statements that are not yet actionable due to out-of-Iterness.
148 ///
149 /// A progress statement becomes actionable once the progress frontier is beyond
150 /// its lower frontier. This ensures that the [0, lower) interval is already
151 /// incorporated, and that we will not leave a gap by incorporating the counts
152 /// and reflecting the progress statement's upper frontier.
153 progress_queue: Vec<Progress<T>>,
154 }
155
156 impl<D, T, R, I> Iterator for Iter<I, D, T, R>
157 where
158 I: Iterator<Item = Message<D, T, R>>,
159 T: Hash + Ord + Lattice + Clone,
160 D: Hash + Eq + Clone,
161 R: Hash + Eq + Clone,
162 {
163 type Item = (Vec<(D, T, R)>, Antichain<T>);
164 fn next(&mut self) -> Option<Self::Item> {
165 // Each call to `next` should return some newly carved interval of time.
166 // As such, we should read from our source until we find such a thing.
167 //
168 // An interval can be completed once our frontier of received progress
169 // information and our frontier of unresolved counts have advanced.
170 while let Some(message) = self.iterator.next() {
171 match message {
172 Message::Updates(mut updates) => {
173 // Discard updates at reported times, or duplicates at unreported times.
174 updates.retain(|dtr| {
175 self.reported_frontier.less_equal(&dtr.1) && !self.updates.contains(dtr)
176 });
177 // Decrement our counts of accounted-for messages.
178 self.messages_frontier
179 .update_iter(updates.iter().map(|(_, t, _)| (t.clone(), -1)));
180 // Record the messages in our de-duplication collection.
181 self.updates.extend(updates.into_iter());
182 }
183 Message::Progress(progress) => {
184 // A progress statement may not be immediately actionable.
185 self.progress_queue.push(progress);
186 }
187 }
188
189 // Attempt to drain actionable progress messages.
190 // A progress message is actionable if `self.progress_frontier` is greater or
191 // equal to the message's lower bound.
192 while let Some(position) = self.progress_queue.iter().position(|p| {
193 <_ as PartialOrder>::less_equal(
194 &AntichainRef::new(&p.lower),
195 &self.progress_frontier.borrow(),
196 )
197 }) {
198 let mut progress = self.progress_queue.remove(position);
199 // Discard counts that have already been incorporated.
200 progress
201 .counts
202 .retain(|(time, _count)| self.progress_frontier.less_equal(time));
203 // Record any new reports of expected counts.
204 self.messages_frontier
205 .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
206 // Extend the frontier to be times greater or equal to both progress.upper and self.progress_frontier.
207 let mut new_frontier = Antichain::new();
208 for time1 in progress.upper {
209 for time2 in self.progress_frontier.elements() {
210 new_frontier.insert(time1.join(time2));
211 }
212 }
213 self.progress_queue.retain(|p| {
214 !<_ as PartialOrder>::less_equal(
215 &AntichainRef::new(&p.upper),
216 &new_frontier.borrow(),
217 )
218 });
219 self.progress_frontier = new_frontier;
220 }
221
222 // Now check and see if our lower bound exceeds `self.reported_frontier`.
223 let mut lower_bound = self.progress_frontier.clone();
224 lower_bound.extend(self.messages_frontier.frontier().iter().cloned());
225 if lower_bound != self.reported_frontier {
226 let to_publish = self
227 .updates
228 .iter()
229 .filter(|(_, t, _)| !lower_bound.less_equal(t))
230 .cloned()
231 .collect::<Vec<_>>();
232 self.updates.retain(|(_, t, _)| lower_bound.less_equal(t));
233 self.reported_frontier = lower_bound.clone();
234 return Some((to_publish, lower_bound));
235 }
236 }
237 None
238 }
239 }
240
241 impl<D, T, R, I> Iter<I, D, T, R>
242 where
243 I: Iterator<Item = Message<D, T, R>>,
244 T: Hash + Ord + Lattice + Clone + Timestamp,
245 D: Hash + Eq + Clone,
246 R: Hash + Eq + Clone,
247 {
248 /// Construct a new re-ordering, deduplicating iterator.
249 pub fn new(iterator: I) -> Self {
250 Self {
251 iterator,
252 updates: std::collections::HashSet::new(),
253 reported_frontier: Antichain::from_elem(T::minimum()),
254 progress_frontier: Antichain::from_elem(T::minimum()),
255 messages_frontier: MutableAntichain::new(),
256 progress_queue: Vec::new(),
257 }
258 }
259 }
260}
261
262/// Methods for recovering update streams from binary bundles.
263pub mod source {
264
265 use super::{Message, Progress};
266 use crate::{lattice::Lattice, ExchangeData};
267 use std::cell::RefCell;
268 use std::hash::Hash;
269 use std::rc::Rc;
270 use std::marker::{Send, Sync};
271 use std::sync::Arc;
272 use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
273 use timely::progress::Timestamp;
274 use timely::scheduling::SyncActivator;
275
276 // TODO(guswynn): implement this generally in timely
277 struct DropActivator {
278 activator: Arc<SyncActivator>,
279 }
280
281 impl Drop for DropActivator {
282 fn drop(&mut self) {
283 // Best effort: failure to activate
284 // is ignored
285 let _ = self.activator.activate();
286 }
287 }
288
289 /// Constructs a stream of updates from a source of messages.
290 ///
291 /// The stream is built in the supplied `scope` and continues to run until
292 /// the returned `Box<Any>` token is dropped. The `source_builder` argument
293 /// is invoked with a `SyncActivator` that will re-activate the source.
294 pub fn build<G, B, I, D, T, R>(
295 scope: G,
296 source_builder: B,
297 ) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
298 where
299 G: Scope<Timestamp = T>,
300 B: FnOnce(SyncActivator) -> I,
301 I: Iterator<Item = Message<D, T, R>> + 'static,
302 D: ExchangeData + Hash,
303 T: ExchangeData + Hash + Timestamp + Lattice,
304 R: ExchangeData + Hash,
305 {
306 // Read messages are either updates or progress messages.
307 // Each may contain duplicates, and we must take care to deduplicate information before introducing it to an accumulation.
308 // This includes both emitting updates, and setting expectations for update counts.
309 //
310 // Updates need to be deduplicated by (data, time), and we should exchange them by such.
311 // Progress needs to be deduplicated by time, and we should exchange them by such.
312 //
313 // The first cut of this is a dataflow graph that looks like (flowing downward)
314 //
315 // 1. MESSAGES:
316 // Reads `Message` stream; maintains capabilities.
317 // Sends `Updates` to UPDATES stage by hash((data, time, diff)).
318 // Sends `Progress` to PROGRESS stage by hash(time), each with lower, upper bounds.
319 // Shares capabilities with downstream operator.
320 // 2. UPDATES:
321 // Maintains and deduplicates updates.
322 // Ships updates once frontier advances.
323 // Ships counts to PROGRESS stage, by hash(time).
324 // 3. PROGRESS:
325 // Maintains outstanding message counts by time. Tracks frontiers.
326 // Tracks lower bounds of messages and progress frontier. Broadcasts changes to FEEDBACK stage
327 // 4. FEEDBACK:
328 // Shares capabilities with MESSAGES; downgrades to track input from PROGRESS.
329 //
330 // Each of these stages can be arbitrarily data-parallel, and FEEDBACK *must* have the same parallelism as RAW.
331 // Limitations: MESSAGES must broadcast lower and upper bounds to PROGRESS and PROGRESS must broadcast its changes
332 // to FEEDBACK. This may mean that scaling up PROGRESS could introduce quadratic problems. Though, both of these
333 // broadcast things are meant to be very reduced data.
334
335 use crate::hashable::Hashable;
336 use timely::dataflow::channels::pact::Exchange;
337 use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
338 use timely::progress::frontier::MutableAntichain;
339 use timely::progress::ChangeBatch;
340
341 // Some message distribution logic depends on the number of workers.
342 let workers = scope.peers();
343
344 let mut token = None;
345 // Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
346 let mut antichain = MutableAntichain::new();
347 antichain.update_iter(Some((T::minimum(), workers as i64)));
348 let shared_frontier = Rc::new(RefCell::new(antichain));
349 let shared_frontier2 = shared_frontier.clone();
350
351 // Step 1: The MESSAGES operator.
352 let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
353 let address = messages_op.operator_info().address;
354 let activator = scope.sync_activator_for(&address);
355 let activator2 = scope.activator_for(&address);
356 let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(&address)) };
357 let mut source = source_builder(activator);
358 let (mut updates_out, updates) = messages_op.new_output();
359 let (mut progress_out, progress) = messages_op.new_output();
360 messages_op.build(|capabilities| {
361
362 // A Weak that communicates whether the returned token has been dropped.
363 let drop_activator_weak = Arc::downgrade(&drop_activator.activator);
364
365 token = Some(drop_activator);
366
367 // Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
368 let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
369 let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
370 // Capture the shared frontier to read out frontier updates to apply.
371 let local_frontier = shared_frontier.clone();
372 //
373 move |_frontiers| {
374 // First check to ensure that we haven't been terminated by someone dropping our tokens.
375 if drop_activator_weak.upgrade().is_none() {
376 // Give up our capabilities
377 updates_caps.downgrade(&[]);
378 progress_caps.downgrade(&[]);
379 // never continue, even if we are (erroneously) activated again.
380 return;
381 }
382
383 // Consult our shared frontier, and ensure capabilities are downgraded to it.
384 let shared_frontier = local_frontier.borrow();
385 updates_caps.downgrade(&shared_frontier.frontier());
386 progress_caps.downgrade(&shared_frontier.frontier());
387
388 // Next check to see if we have been terminated by the source being complete.
389 if !updates_caps.is_empty() && !progress_caps.is_empty() {
390 let mut updates = updates_out.activate();
391 let mut progress = progress_out.activate();
392
393 // TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
394 // Specifically, there may not be one capability valid for all updates.
395 let mut updates_session = updates.session(&updates_caps[0]);
396 let mut progress_session = progress.session(&progress_caps[0]);
397
398 // We presume the iterator will yield if appropriate.
399 for message in source.by_ref() {
400 match message {
401 Message::Updates(mut updates) => {
402 updates_session.give_vec(&mut updates);
403 }
404 Message::Progress(progress) => {
405 // We must send a copy of each progress message to all workers,
406 // but we can partition the counts across workers by timestamp.
407 let mut to_worker = vec![Vec::new(); workers];
408 for (time, count) in progress.counts {
409 to_worker[(time.hashed() as usize) % workers]
410 .push((time, count));
411 }
412 for (worker, counts) in to_worker.into_iter().enumerate() {
413 progress_session.give((
414 worker,
415 Progress {
416 lower: progress.lower.clone(),
417 upper: progress.upper.clone(),
418 counts,
419 },
420 ));
421 }
422 }
423 }
424 }
425 }
426 }
427 });
428
429 // Step 2: The UPDATES operator.
430 let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone());
431 let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
432 let (mut changes_out, changes) = updates_op.new_output();
433 let (mut counts_out, counts) = updates_op.new_output();
434 updates_op.build(move |_capability| {
435 // Deduplicates updates, and ships novel updates and the counts for each time.
436 // For simplicity, this operator ships updates as they are discovered to be new.
437 // This has the defect that on load we may have two copies of the data (shipped,
438 // and here for deduplication).
439 //
440 // Filters may be pushed ahead of this operator, but because of deduplication we
441 // may not push projections ahead of this operator (at least, not without fields
442 // that are known to form keys, and even then only carefully).
443 let mut pending = std::collections::HashMap::new();
444 let mut change_batch = ChangeBatch::<T>::new();
445 move |frontiers| {
446 // Thin out deduplication buffer.
447 // This is the moment in a more advanced implementation where we might send
448 // the data for the first time, maintaining only one copy of each update live
449 // at a time in the system.
450 pending.retain(|(_row, time), _diff| frontiers[0].less_equal(time));
451
452 // Deduplicate newly received updates, sending new updates and timestamp counts.
453 let mut changes = changes_out.activate();
454 let mut counts = counts_out.activate();
455 while let Some((capability, updates)) = input.next() {
456 let mut changes_session = changes.session(&capability);
457 let mut counts_session = counts.session(&capability);
458 for (data, time, diff) in updates.iter() {
459 if frontiers[0].less_equal(time) {
460 if let Some(prior) = pending.insert((data.clone(), time.clone()), diff.clone()) {
461 assert_eq!(&prior, diff);
462 } else {
463 change_batch.update(time.clone(), -1);
464 changes_session.give((data.clone(), time.clone(), diff.clone()));
465 }
466 }
467 }
468 if !change_batch.is_empty() {
469 counts_session.give_iterator(change_batch.drain());
470 }
471 }
472 }
473 });
474
475 // Step 3: The PROGRESS operator.
476 let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope.clone());
477 let mut input = progress_op.new_input(
478 &progress,
479 Exchange::new(|x: &(usize, Progress<T>)| x.0 as u64),
480 );
481 let mut counts =
482 progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
483 let (mut frontier_out, frontier) = progress_op.new_output();
484 progress_op.build(move |_capability| {
485 // Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.
486
487 use timely::order::PartialOrder;
488 use timely::progress::{frontier::AntichainRef, Antichain};
489
490 let mut progress_queue = Vec::new();
491 let mut progress_frontier = Antichain::from_elem(T::minimum());
492 let mut updates_frontier = MutableAntichain::new();
493 let mut reported_frontier = Antichain::from_elem(T::minimum());
494
495 move |_frontiers| {
496 let mut frontier = frontier_out.activate();
497
498 // If the frontier changes we need a capability to express that.
499 // Any capability should work; the downstream listener doesn't care.
500 let mut capability: Option<Capability<T>> = None;
501
502 // Drain all relevant update counts in to the mutable antichain tracking its frontier.
503 while let Some((cap, counts)) = counts.next() {
504 updates_frontier.update_iter(counts.iter().cloned());
505 capability = Some(cap.retain());
506 }
507 // Drain all progress statements into the queue out of which we will work.
508 while let Some((cap, progress)) = input.next() {
509 progress_queue.extend(progress.iter().map(|x| (x.1).clone()));
510 capability = Some(cap.retain());
511 }
512
513 // Extract and act on actionable progress messages.
514 // A progress message is actionable if `self.progress_frontier` is beyond the message's lower bound.
515 while let Some(position) = progress_queue.iter().position(|p| {
516 <_ as PartialOrder>::less_equal(
517 &AntichainRef::new(&p.lower),
518 &progress_frontier.borrow(),
519 )
520 }) {
521 // Extract progress statement.
522 let mut progress = progress_queue.remove(position);
523 // Discard counts that have already been incorporated.
524 progress
525 .counts
526 .retain(|(time, _count)| progress_frontier.less_equal(time));
527 // Record any new reports of expected counts.
528 updates_frontier
529 .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
530 // Extend self.progress_frontier by progress.upper.
531 let mut new_frontier = Antichain::new();
532 for time1 in progress.upper {
533 for time2 in progress_frontier.elements() {
534 new_frontier.insert(time1.join(time2));
535 }
536 }
537 progress_frontier = new_frontier;
538 }
539
540 // Determine if the lower bound of frontiers have advanced, and transmit updates if so.
541 let mut lower_bound = progress_frontier.clone();
542 lower_bound.extend(updates_frontier.frontier().iter().cloned());
543 if lower_bound != reported_frontier {
544 let capability =
545 capability.expect("Changes occurred, without surfacing a capability");
546 let mut changes = ChangeBatch::new();
547 changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1)));
548 changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1)));
549 let mut frontier_session = frontier.session(&capability);
550 for peer in 0..workers {
551 frontier_session.give((peer, changes.clone()));
552 }
553 reported_frontier = lower_bound.clone();
554 }
555 }
556 });
557
558 // Step 4: The FEEDBACK operator.
559 let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope.clone());
560 let mut input = feedback_op.new_input(
561 &frontier,
562 Exchange::new(|x: &(usize, ChangeBatch<T>)| x.0 as u64),
563 );
564 feedback_op.build(move |_capability| {
565 // Receive frontier changes and share the net result with MESSAGES.
566 move |_frontiers| {
567 let mut antichain = shared_frontier2.borrow_mut();
568 let mut must_activate = false;
569 while let Some((_cap, frontier_changes)) = input.next() {
570 for (_self, input_changes) in frontier_changes.iter() {
571 // Apply the updates, and observe if the lower bound has changed.
572 if antichain.update_iter(input_changes.unstable_internal_updates().iter().cloned()).next().is_some() {
573 must_activate = true;
574 }
575 }
576 }
577 // If the lower bound has changed, we must activate MESSAGES.
578 if must_activate { activator2.activate(); }
579 }
580 });
581
582 (Box::new(token.unwrap()), changes)
583 }
584}
585
586/// Methods for recording update streams to binary bundles.
587pub mod sink {
588
589 use std::hash::Hash;
590 use std::cell::RefCell;
591 use std::rc::Weak;
592
593 use serde::{Deserialize, Serialize};
594
595 use timely::order::PartialOrder;
596 use timely::progress::{Antichain, ChangeBatch, Timestamp};
597 use timely::dataflow::{Scope, Stream};
598 use timely::dataflow::channels::pact::{Exchange, Pipeline};
599 use timely::dataflow::operators::generic::{FrontieredInputHandle, builder_rc::OperatorBuilder};
600
601 use crate::{lattice::Lattice, ExchangeData};
602 use super::{Writer, Message, Progress};
603
604 /// Constructs a sink, for recording the updates in `stream`.
605 ///
606 /// It is crucial that `stream` has been consolidated before this method, which
607 /// will *not* perform the consolidation on the stream's behalf. If this is not
608 /// performed before calling the method, the recorded output may not be correctly
609 /// reconstructed by readers.
610 pub fn build<G, BS, D, T, R>(
611 stream: &Stream<G, (D, T, R)>,
612 sink_hash: u64,
613 updates_sink: Weak<RefCell<BS>>,
614 progress_sink: Weak<RefCell<BS>>,
615 ) where
616 G: Scope<Timestamp = T>,
617 BS: Writer<Message<D,T,R>> + 'static,
618 D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
619 T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
620 R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
621 {
622 // First we record the updates that stream in.
623 // We can simply record all updates, under the presumption that the have been consolidated
624 // and so any record we see is in fact guaranteed to happen.
625 let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
626 let reactivator = stream.scope().activator_for(&builder.operator_info().address);
627 let mut input = builder.new_input(stream, Pipeline);
628 let (mut updates_out, updates) = builder.new_output();
629
630 builder.build_reschedule(
631 move |_capability| {
632 let mut timestamps = ChangeBatch::new();
633 let mut send_queue = std::collections::VecDeque::new();
634 move |_frontiers| {
635 let mut output = updates_out.activate();
636
637 // We want to drain inputs always...
638 input.for_each(|capability, updates| {
639 // Write each update out, and record the timestamp.
640 for (_data, time, _diff) in updates.iter() {
641 timestamps.update(time.clone(), 1);
642 }
643
644 // Now record the update to the writer.
645 send_queue.push_back(Message::Updates(updates.replace(Vec::new())));
646
647 // Transmit timestamp counts downstream.
648 output
649 .session(&capability)
650 .give_iterator(timestamps.drain());
651 });
652
653 // Drain whatever we can from the queue of bytes to send.
654 // ... but needn't do anything more if our sink is closed.
655 if let Some(sink) = updates_sink.upgrade() {
656 let mut sink = sink.borrow_mut();
657 while let Some(message) = send_queue.front() {
658 if let Some(duration) = sink.poll(message) {
659 // Reschedule after `duration` and then bail.
660 reactivator.activate_after(duration);
661 return true;
662 } else {
663 send_queue.pop_front();
664 }
665 }
666 // Signal incompleteness if messages remain to be sent.
667 !sink.done() || !send_queue.is_empty()
668 } else {
669 // We have been terminated, but may still receive indefinite data.
670 send_queue.clear();
671 // Signal that there are no outstanding writes.
672 false
673 }
674 }
675 },
676 );
677
678 // We use a lower-level builder here to get access to the operator address, for rescheduling.
679 let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
680 let reactivator = stream.scope().activator_for(&builder.operator_info().address);
681 let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
682 let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();
683
684 // We now record the numbers of updates at each timestamp between lower and upper bounds.
685 // Track the advancing frontier, to know when to produce utterances.
686 let mut frontier = Antichain::from_elem(T::minimum());
687 // Track accumulated counts for timestamps.
688 let mut timestamps = ChangeBatch::new();
689 // Stash for serialized data yet to send.
690 let mut send_queue = std::collections::VecDeque::new();
691 let mut retain = Vec::new();
692
693 builder.build_reschedule(|_capabilities| {
694 move |frontiers| {
695 let mut input = FrontieredInputHandle::new(&mut input, &frontiers[0]);
696
697 // We want to drain inputs no matter what.
698 // We could do this after the next step, as we are certain these timestamps will
699 // not be part of a closed frontier (as they have not yet been read). This has the
700 // potential to make things speedier as we scan less and keep a smaller footprint.
701 input.for_each(|_capability, counts| {
702 timestamps.extend(counts.iter().cloned());
703 });
704
705 if should_write {
706 if let Some(sink) = progress_sink.upgrade() {
707 let mut sink = sink.borrow_mut();
708
709 // If our frontier advances strictly, we have the opportunity to issue a progress statement.
710 if <_ as PartialOrder>::less_than(
711 &frontier.borrow(),
712 &input.frontier.frontier(),
713 ) {
714 let new_frontier = input.frontier.frontier();
715
716 // Extract the timestamp counts to announce.
717 let mut announce = Vec::new();
718 for (time, count) in timestamps.drain() {
719 if !new_frontier.less_equal(&time) {
720 announce.push((time, count as usize));
721 } else {
722 retain.push((time, count));
723 }
724 }
725 timestamps.extend(retain.drain(..));
726
727 // Announce the lower bound, upper bound, and timestamp counts.
728 let progress = Progress {
729 lower: frontier.elements().to_vec(),
730 upper: new_frontier.to_vec(),
731 counts: announce,
732 };
733 send_queue.push_back(Message::Progress(progress));
734
735 // Advance our frontier to track our progress utterance.
736 frontier = input.frontier.frontier().to_owned();
737
738 while let Some(message) = send_queue.front() {
739 if let Some(duration) = sink.poll(message) {
740 // Reschedule after `duration` and then bail.
741 reactivator.activate_after(duration);
742 // Signal that work remains to be done.
743 return true;
744 } else {
745 send_queue.pop_front();
746 }
747 }
748 }
749 // Signal incompleteness if messages remain to be sent.
750 !sink.done() || !send_queue.is_empty()
751 } else {
752 timestamps.clear();
753 send_queue.clear();
754 // Signal that there are no outstanding writes.
755 false
756 }
757 } else { false }
758 }
759 });
760 }
761}
762
763// pub mod kafka {
764
765// use serde::{Serialize, Deserialize};
766// use timely::scheduling::SyncActivator;
767// use rdkafka::{ClientContext, config::ClientConfig};
768// use rdkafka::consumer::{BaseConsumer, ConsumerContext};
769// use rdkafka::error::{KafkaError, RDKafkaError};
770// use super::BytesSink;
771
772// use std::hash::Hash;
773// use timely::progress::Timestamp;
774// use timely::dataflow::{Scope, Stream};
775// use crate::ExchangeData;
776// use crate::lattice::Lattice;
777
778// /// Creates a Kafka source from supplied configuration information.
779// pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
780// where
781// G: Scope<Timestamp = T>,
782// D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
783// T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice,
784// R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
785// {
786// super::source::build(scope, |activator| {
787// let source = KafkaSource::new(addr, topic, group, activator);
788// super::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
789// })
790// }
791
792// pub fn create_sink<G, D, T, R>(stream: &Stream<G, (D, T, R)>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
793// where
794// G: Scope<Timestamp = T>,
795// D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
796// T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
797// R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
798// {
799// use std::rc::Rc;
800// use std::cell::RefCell;
801// use crate::hashable::Hashable;
802
803// let sink = KafkaSink::new(addr, topic);
804// let result = Rc::new(RefCell::new(sink));
805// let sink_hash = (addr.to_string(), topic.to_string()).hashed();
806// super::sink::build(
807// &stream,
808// sink_hash,
809// Rc::downgrade(&result),
810// Rc::downgrade(&result),
811// );
812// Box::new(result)
813
814// }
815
816// pub struct KafkaSource {
817// consumer: BaseConsumer<ActivationConsumerContext>,
818// }
819
820// impl KafkaSource {
821// pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self {
822// let mut kafka_config = ClientConfig::new();
823// // This is mostly cargo-cult'd in from `source/kafka.rs`.
824// kafka_config.set("bootstrap.servers", &addr.to_string());
825// kafka_config
826// .set("enable.auto.commit", "false")
827// .set("auto.offset.reset", "earliest");
828
829// kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); // 30 seconds
830// kafka_config.set("fetch.message.max.bytes", "134217728");
831// kafka_config.set("group.id", group);
832// kafka_config.set("isolation.level", "read_committed");
833// let activator = ActivationConsumerContext(activator);
834// let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap();
835// use rdkafka::consumer::Consumer;
836// consumer.subscribe(&[topic]).unwrap();
837// Self {
838// consumer,
839// }
840// }
841// }
842
843// pub struct Iter<D, T, R> {
844// pub source: KafkaSource,
845// phantom: std::marker::PhantomData<(D, T, R)>,
846// }
847
848// impl<D, T, R> Iter<D, T, R> {
849// /// Constructs a new iterator from a bytes source.
850// pub fn new_from(source: KafkaSource) -> Self {
851// Self {
852// source,
853// phantom: std::marker::PhantomData,
854// }
855// }
856// }
857
858// impl<D, T, R> Iterator for Iter<D, T, R>
859// where
860// D: for<'a>Deserialize<'a>,
861// T: for<'a>Deserialize<'a>,
862// R: for<'a>Deserialize<'a>,
863// {
864// type Item = super::Message<D, T, R>;
865// fn next(&mut self) -> Option<Self::Item> {
866// use rdkafka::message::Message;
867// self.source
868// .consumer
869// .poll(std::time::Duration::from_millis(0))
870// .and_then(|result| result.ok())
871// .and_then(|message| {
872// message.payload().and_then(|message| bincode::deserialize::<super::Message<D, T, R>>(message).ok())
873// })
874// }
875// }
876
877// /// An implementation of [`ConsumerContext`] that unparks the wrapped thread
878// /// when the message queue switches from nonempty to empty.
879// struct ActivationConsumerContext(SyncActivator);
880
881// impl ClientContext for ActivationConsumerContext { }
882
883// impl ActivationConsumerContext {
884// fn activate(&self) {
885// self.0.activate().unwrap();
886// }
887// }
888
889// impl ConsumerContext for ActivationConsumerContext {
890// fn message_queue_nonempty_callback(&self) {
891// self.activate();
892// }
893// }
894
895// use std::time::Duration;
896// use rdkafka::producer::DefaultProducerContext;
897// use rdkafka::producer::{BaseRecord, ThreadedProducer};
898
899// pub struct KafkaSink {
900// topic: String,
901// producer: ThreadedProducer<DefaultProducerContext>,
902// }
903
904// impl KafkaSink {
905// pub fn new(addr: &str, topic: &str) -> Self {
906// let mut config = ClientConfig::new();
907// config.set("bootstrap.servers", &addr);
908// config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20));
909// config.set("queue.buffering.max.messages", &format!("{}", 10_000_000));
910// config.set("queue.buffering.max.ms", &format!("{}", 10));
911// let producer = config
912// .create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext)
913// .expect("creating kafka producer for kafka sinks failed");
914// Self {
915// producer,
916// topic: topic.to_string(),
917// }
918// }
919// }
920
921// impl BytesSink for KafkaSink {
922// fn poll(&mut self, bytes: &[u8]) -> Option<Duration> {
923// let record = BaseRecord::<[u8], _>::to(&self.topic).payload(bytes);
924
925// self.producer.send(record).err().map(|(e, _)| {
926// if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e {
927// Duration::from_secs(1)
928// } else {
929// // TODO(frank): report this error upwards so the user knows the sink is dead.
930// Duration::from_secs(1)
931// }
932// })
933// }
934// fn done(&self) -> bool {
935// self.producer.in_flight_count() == 0
936// }
937// }
938
939// }