differential_dataflow/capture.rs
1//! Logic related to capture and replay of differential collections.
2//!
3//! This module defines a protocol for capturing and replaying differential collections
4//! to streaming storage that may both duplicate and reorder messages. It records facts
5//! about the collection that once true stay true, such as the exact changes data undergo
6//! at each time, and the number of distinct updates at each time.
7//!
8//! The methods are parameterized by implementors of byte sources and byte sinks. For
9//! example implementations of these traits, consult the commented text at the end of
10//! this file.
11
12use std::time::Duration;
13use serde::{Deserialize, Serialize};
14
15/// A message in the CDC V2 protocol.
16#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
17pub enum Message<D, T, R> {
18 /// A batch of updates that are certain to occur.
19 ///
20 /// Each triple is an irrevocable statement about a change that occurs.
21 /// Each statement contains a datum, a time, and a difference, and asserts
22 /// that the multiplicity of the datum changes at the time by the difference.
23 Updates(Vec<(D, T, R)>),
24 /// An irrevocable statement about the number of updates within a time interval.
25 Progress(Progress<T>),
26}
27
28/// An irrevocable statement about the number of updates at times within an interval.
29///
30/// This statement covers all times beyond `lower` and not beyond `upper`.
31/// Each element of `counts` is an irrevocable statement about the exact number of
32/// distinct updates that occur at that time.
33/// Times not present in `counts` have a count of zero.
34#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
35pub struct Progress<T> {
36 /// The lower bound of times contained in this statement.
37 pub lower: Vec<T>,
38 /// The upper bound of times contained in this statement.
39 pub upper: Vec<T>,
40 /// All non-zero counts for times beyond `lower` and not beyond `upper`.
41 pub counts: Vec<(T, usize)>,
42}
43
44/// A simple sink for byte slices.
45pub trait Writer<T> {
46 /// Returns an amount of time to wait before retrying, or `None` for success.
47 fn poll(&mut self, item: &T) -> Option<Duration>;
48 /// Indicates if the sink has committed all sent data and can be safely dropped.
49 fn done(&self) -> bool;
50}
51
52/// A deduplicating, re-ordering iterator.
53pub mod iterator {
54
55 use super::{Message, Progress};
56 use crate::lattice::Lattice;
57 use std::hash::Hash;
58 use timely::order::PartialOrder;
59 use timely::progress::{
60 frontier::{AntichainRef, MutableAntichain},
61 Antichain,
62 Timestamp,
63 };
64
65 /// A direct implementation of a deduplicating, re-ordering iterator.
66 ///
67 /// The iterator draws from a source that may have arbitrary duplication, be arbitrarily out of order,
68 /// and yet produces each update once, with in-order batches. The iterator maintains a bounded memory
69 /// footprint, proportional to the mismatch between the received updates and progress messages.
70 pub struct Iter<I, D, T, R>
71 where
72 I: Iterator<Item = Message<D, T, R>>,
73 T: Hash + Ord + Lattice + Clone,
74 D: Hash + Eq,
75 T: Hash + Eq,
76 R: Hash + Eq,
77 {
78 /// Source of potentially duplicated, out of order cdc_v2 messages.
79 iterator: I,
80 /// Updates that have been received, but are still beyond `reported_frontier`.
81 ///
82 /// These updates are retained both so that they can eventually be transmitted,
83 /// but also so that they can deduplicate updates that may still be received.
84 updates: std::collections::HashSet<(D, T, R)>,
85 /// Frontier through which the iterator has reported updates.
86 ///
87 /// All updates not beyond this frontier have been reported.
88 /// Any information related to times not beyond this frontier can be discarded.
89 ///
90 /// This frontier tracks the meet of `progress_frontier` and `messages_frontier`,
91 /// our two bounds on potential uncertainty in progress and update messages.
92 reported_frontier: Antichain<T>,
93 /// Frontier of accepted progress statements.
94 ///
95 /// All progress message counts for times not beyond this frontier have been
96 /// incorporated in to `messages_frontier`. This frontier also guides which
97 /// received progress statements can be incorporated: those whose for which
98 /// this frontier is beyond their lower bound.
99 progress_frontier: Antichain<T>,
100 /// Counts of outstanding messages at times.
101 ///
102 /// These counts track the difference between message counts at times announced
103 /// by progress messages, and message counts at times received in distinct updates.
104 messages_frontier: MutableAntichain<T>,
105 /// Progress statements that are not yet actionable due to out-of-Iterness.
106 ///
107 /// A progress statement becomes actionable once the progress frontier is beyond
108 /// its lower frontier. This ensures that the [0, lower) interval is already
109 /// incorporated, and that we will not leave a gap by incorporating the counts
110 /// and reflecting the progress statement's upper frontier.
111 progress_queue: Vec<Progress<T>>,
112 }
113
114 impl<D, T, R, I> Iterator for Iter<I, D, T, R>
115 where
116 I: Iterator<Item = Message<D, T, R>>,
117 T: Hash + Ord + Lattice + Clone,
118 D: Hash + Eq + Clone,
119 R: Hash + Eq + Clone,
120 {
121 type Item = (Vec<(D, T, R)>, Antichain<T>);
122 fn next(&mut self) -> Option<Self::Item> {
123 // Each call to `next` should return some newly carved interval of time.
124 // As such, we should read from our source until we find such a thing.
125 //
126 // An interval can be completed once our frontier of received progress
127 // information and our frontier of unresolved counts have advanced.
128 while let Some(message) = self.iterator.next() {
129 match message {
130 Message::Updates(mut updates) => {
131 // Discard updates at reported times, or duplicates at unreported times.
132 updates.retain(|dtr| {
133 self.reported_frontier.less_equal(&dtr.1) && !self.updates.contains(dtr)
134 });
135 // Decrement our counts of accounted-for messages.
136 self.messages_frontier
137 .update_iter(updates.iter().map(|(_, t, _)| (t.clone(), -1)));
138 // Record the messages in our de-duplication collection.
139 self.updates.extend(updates.into_iter());
140 }
141 Message::Progress(progress) => {
142 // A progress statement may not be immediately actionable.
143 self.progress_queue.push(progress);
144 }
145 }
146
147 // Attempt to drain actionable progress messages.
148 // A progress message is actionable if `self.progress_frontier` is greater or
149 // equal to the message's lower bound.
150 while let Some(position) = self.progress_queue.iter().position(|p| {
151 <_ as PartialOrder>::less_equal(
152 &AntichainRef::new(&p.lower),
153 &self.progress_frontier.borrow(),
154 )
155 }) {
156 let mut progress = self.progress_queue.remove(position);
157 // Discard counts that have already been incorporated.
158 progress
159 .counts
160 .retain(|(time, _count)| self.progress_frontier.less_equal(time));
161 // Record any new reports of expected counts.
162 self.messages_frontier
163 .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
164 // Extend the frontier to be times greater or equal to both progress.upper and self.progress_frontier.
165 let mut new_frontier = Antichain::new();
166 for time1 in progress.upper {
167 for time2 in self.progress_frontier.elements() {
168 new_frontier.insert(time1.join(time2));
169 }
170 }
171 self.progress_queue.retain(|p| {
172 !<_ as PartialOrder>::less_equal(
173 &AntichainRef::new(&p.upper),
174 &new_frontier.borrow(),
175 )
176 });
177 self.progress_frontier = new_frontier;
178 }
179
180 // Now check and see if our lower bound exceeds `self.reported_frontier`.
181 let mut lower_bound = self.progress_frontier.clone();
182 lower_bound.extend(self.messages_frontier.frontier().iter().cloned());
183 if lower_bound != self.reported_frontier {
184 let to_publish = self
185 .updates
186 .iter()
187 .filter(|(_, t, _)| !lower_bound.less_equal(t))
188 .cloned()
189 .collect::<Vec<_>>();
190 self.updates.retain(|(_, t, _)| lower_bound.less_equal(t));
191 self.reported_frontier = lower_bound.clone();
192 return Some((to_publish, lower_bound));
193 }
194 }
195 None
196 }
197 }
198
199 impl<D, T, R, I> Iter<I, D, T, R>
200 where
201 I: Iterator<Item = Message<D, T, R>>,
202 T: Hash + Ord + Lattice + Clone + Timestamp,
203 D: Hash + Eq + Clone,
204 R: Hash + Eq + Clone,
205 {
206 /// Construct a new re-ordering, deduplicating iterator.
207 pub fn new(iterator: I) -> Self {
208 Self {
209 iterator,
210 updates: std::collections::HashSet::new(),
211 reported_frontier: Antichain::from_elem(T::minimum()),
212 progress_frontier: Antichain::from_elem(T::minimum()),
213 messages_frontier: MutableAntichain::new(),
214 progress_queue: Vec::new(),
215 }
216 }
217 }
218}
219
220/// Methods for recovering update streams from binary bundles.
221pub mod source {
222
223 use super::{Message, Progress};
224 use crate::{lattice::Lattice, ExchangeData};
225 use std::cell::RefCell;
226 use std::hash::Hash;
227 use std::rc::Rc;
228 use std::marker::{Send, Sync};
229 use std::sync::Arc;
230 use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
231 use timely::dataflow::operators::generic::OutputBuilder;
232 use timely::progress::Timestamp;
233 use timely::scheduling::SyncActivator;
234
235 // TODO(guswynn): implement this generally in timely
236 struct DropActivator {
237 activator: Arc<SyncActivator>,
238 }
239
240 impl Drop for DropActivator {
241 fn drop(&mut self) {
242 // Best effort: failure to activate
243 // is ignored
244 let _ = self.activator.activate();
245 }
246 }
247
248 /// Constructs a stream of updates from a source of messages.
249 ///
250 /// The stream is built in the supplied `scope` and continues to run until
251 /// the returned `Box<Any>` token is dropped. The `source_builder` argument
252 /// is invoked with a `SyncActivator` that will re-activate the source.
253 pub fn build<B, I, D, T, R>(
254 scope: Scope<T>,
255 source_builder: B,
256 ) -> (Box<dyn std::any::Any + Send + Sync>, Stream<T, Vec<(D, T, R)>>)
257 where
258 B: FnOnce(SyncActivator) -> I,
259 I: Iterator<Item = Message<D, T, R>> + 'static,
260 D: ExchangeData + Hash,
261 T: ExchangeData + Hash + Timestamp + Lattice,
262 R: ExchangeData + Hash,
263 {
264 // Read messages are either updates or progress messages.
265 // Each may contain duplicates, and we must take care to deduplicate information before introducing it to an accumulation.
266 // This includes both emitting updates, and setting expectations for update counts.
267 //
268 // Updates need to be deduplicated by (data, time), and we should exchange them by such.
269 // Progress needs to be deduplicated by time, and we should exchange them by such.
270 //
271 // The first cut of this is a dataflow graph that looks like (flowing downward)
272 //
273 // 1. MESSAGES:
274 // Reads `Message` stream; maintains capabilities.
275 // Sends `Updates` to UPDATES stage by hash((data, time, diff)).
276 // Sends `Progress` to PROGRESS stage by hash(time), each with lower, upper bounds.
277 // Shares capabilities with downstream operator.
278 // 2. UPDATES:
279 // Maintains and deduplicates updates.
280 // Ships updates once frontier advances.
281 // Ships counts to PROGRESS stage, by hash(time).
282 // 3. PROGRESS:
283 // Maintains outstanding message counts by time. Tracks frontiers.
284 // Tracks lower bounds of messages and progress frontier. Broadcasts changes to FEEDBACK stage
285 // 4. FEEDBACK:
286 // Shares capabilities with MESSAGES; downgrades to track input from PROGRESS.
287 //
288 // Each of these stages can be arbitrarily data-parallel, and FEEDBACK *must* have the same parallelism as RAW.
289 // Limitations: MESSAGES must broadcast lower and upper bounds to PROGRESS and PROGRESS must broadcast its changes
290 // to FEEDBACK. This may mean that scaling up PROGRESS could introduce quadratic problems. Though, both of these
291 // broadcast things are meant to be very reduced data.
292
293 use crate::hashable::Hashable;
294 use timely::dataflow::channels::pact::Exchange;
295 use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
296 use timely::progress::frontier::MutableAntichain;
297 use timely::progress::ChangeBatch;
298
299 // Some message distribution logic depends on the number of workers.
300 let workers = scope.peers();
301
302 let mut token = None;
303 // Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
304 let mut antichain = MutableAntichain::new();
305 antichain.update_iter(Some((T::minimum(), workers as i64)));
306 let shared_frontier = Rc::new(RefCell::new(antichain));
307 let shared_frontier2 = shared_frontier.clone();
308
309 // Step 1: The MESSAGES operator.
310 let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope);
311 let address = messages_op.operator_info().address;
312 let activator = scope.worker().sync_activator_for(address.to_vec());
313 let activator2 = scope.activator_for(Rc::clone(&address));
314 let drop_activator = DropActivator { activator: Arc::new(scope.worker().sync_activator_for(address.to_vec())) };
315 let mut source = source_builder(activator);
316 let (updates_out, updates) = messages_op.new_output();
317 let mut updates_out = OutputBuilder::from(updates_out);
318 let (progress_out, progress) = messages_op.new_output();
319 let mut progress_out = OutputBuilder::from(progress_out);
320
321 messages_op.build(|capabilities| {
322
323 // A Weak that communicates whether the returned token has been dropped.
324 let drop_activator_weak = Arc::downgrade(&drop_activator.activator);
325
326 token = Some(drop_activator);
327
328 // Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
329 let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
330 let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
331 // Capture the shared frontier to read out frontier updates to apply.
332 let local_frontier = shared_frontier.clone();
333 //
334 move |_frontiers| {
335 // First check to ensure that we haven't been terminated by someone dropping our tokens.
336 if drop_activator_weak.upgrade().is_none() {
337 // Give up our capabilities
338 updates_caps.downgrade(&[]);
339 progress_caps.downgrade(&[]);
340 // never continue, even if we are (erroneously) activated again.
341 return;
342 }
343
344 // Consult our shared frontier, and ensure capabilities are downgraded to it.
345 let shared_frontier = local_frontier.borrow();
346 updates_caps.downgrade(&shared_frontier.frontier());
347 progress_caps.downgrade(&shared_frontier.frontier());
348
349 // Next check to see if we have been terminated by the source being complete.
350 if !updates_caps.is_empty() && !progress_caps.is_empty() {
351 let mut updates = updates_out.activate();
352 let mut progress = progress_out.activate();
353
354 // TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
355 // Specifically, there may not be one capability valid for all updates.
356 let mut updates_session = updates.session(&updates_caps[0]);
357 let mut progress_session = progress.session(&progress_caps[0]);
358
359 // We presume the iterator will yield if appropriate.
360 for message in source.by_ref() {
361 match message {
362 Message::Updates(mut updates) => {
363 updates_session.give_container(&mut updates);
364 }
365 Message::Progress(progress) => {
366 // We must send a copy of each progress message to all workers,
367 // but we can partition the counts across workers by timestamp.
368 let mut to_worker = vec![Vec::new(); workers];
369 for (time, count) in progress.counts {
370 to_worker[(time.hashed() as usize) % workers]
371 .push((time, count));
372 }
373 for (worker, counts) in to_worker.into_iter().enumerate() {
374 progress_session.give((
375 worker,
376 Progress {
377 lower: progress.lower.clone(),
378 upper: progress.upper.clone(),
379 counts,
380 },
381 ));
382 }
383 }
384 }
385 }
386 }
387 }
388 });
389
390 // Step 2: The UPDATES operator.
391 let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope);
392 let mut input = updates_op.new_input(updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
393 let (changes_out, changes) = updates_op.new_output();
394 let mut changes_out = OutputBuilder::from(changes_out);
395 let (counts_out, counts) = updates_op.new_output();
396 let mut counts_out = OutputBuilder::from(counts_out);
397
398 updates_op.build(move |_capability| {
399 // Deduplicates updates, and ships novel updates and the counts for each time.
400 // For simplicity, this operator ships updates as they are discovered to be new.
401 // This has the defect that on load we may have two copies of the data (shipped,
402 // and here for deduplication).
403 //
404 // Filters may be pushed ahead of this operator, but because of deduplication we
405 // may not push projections ahead of this operator (at least, not without fields
406 // that are known to form keys, and even then only carefully).
407 let mut pending = std::collections::HashMap::new();
408 let mut change_batch = ChangeBatch::<T>::new();
409 move |frontiers| {
410 // Thin out deduplication buffer.
411 // This is the moment in a more advanced implementation where we might send
412 // the data for the first time, maintaining only one copy of each update live
413 // at a time in the system.
414 pending.retain(|(_row, time), _diff| frontiers[0].less_equal(time));
415
416 // Deduplicate newly received updates, sending new updates and timestamp counts.
417 let mut changes = changes_out.activate();
418 let mut counts = counts_out.activate();
419 input.for_each(|capability, updates| {
420 let mut changes_session = changes.session(&capability);
421 let mut counts_session = counts.session(&capability);
422 for (data, time, diff) in updates.iter() {
423 if frontiers[0].less_equal(time) {
424 if let Some(prior) = pending.insert((data.clone(), time.clone()), diff.clone()) {
425 assert_eq!(&prior, diff);
426 } else {
427 change_batch.update(time.clone(), -1);
428 changes_session.give((data.clone(), time.clone(), diff.clone()));
429 }
430 }
431 }
432 if !change_batch.is_empty() {
433 counts_session.give_iterator(change_batch.drain());
434 }
435 });
436 }
437 });
438
439 // Step 3: The PROGRESS operator.
440 let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope);
441 let mut input = progress_op.new_input(
442 progress,
443 Exchange::new(|x: &(usize, Progress<T>)| x.0 as u64),
444 );
445 let mut counts =
446 progress_op.new_input(counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
447 let (frontier_out, frontier) = progress_op.new_output();
448 let mut frontier_out = OutputBuilder::from(frontier_out);
449 progress_op.build(move |_capability| {
450 // Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.
451
452 use timely::order::PartialOrder;
453 use timely::progress::{frontier::AntichainRef, Antichain};
454
455 let mut progress_queue = Vec::new();
456 let mut progress_frontier = Antichain::from_elem(T::minimum());
457 let mut updates_frontier = MutableAntichain::new();
458 let mut reported_frontier = Antichain::from_elem(T::minimum());
459
460 move |_frontiers| {
461 let mut frontier = frontier_out.activate();
462
463 // If the frontier changes we need a capability to express that.
464 // Any capability should work; the downstream listener doesn't care.
465 let mut capability: Option<Capability<T>> = None;
466
467 // Drain all relevant update counts in to the mutable antichain tracking its frontier.
468 counts.for_each(|cap, counts| {
469 updates_frontier.update_iter(counts.iter().cloned());
470 capability = Some(cap.retain(0));
471 });
472 // Drain all progress statements into the queue out of which we will work.
473 input.for_each(|cap, progress| {
474 progress_queue.extend(progress.iter().map(|x| (x.1).clone()));
475 capability = Some(cap.retain(0));
476 });
477
478 // Extract and act on actionable progress messages.
479 // A progress message is actionable if `self.progress_frontier` is beyond the message's lower bound.
480 while let Some(position) = progress_queue.iter().position(|p| {
481 <_ as PartialOrder>::less_equal(
482 &AntichainRef::new(&p.lower),
483 &progress_frontier.borrow(),
484 )
485 }) {
486 // Extract progress statement.
487 let mut progress = progress_queue.remove(position);
488 // Discard counts that have already been incorporated.
489 progress
490 .counts
491 .retain(|(time, _count)| progress_frontier.less_equal(time));
492 // Record any new reports of expected counts.
493 updates_frontier
494 .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
495 // Extend self.progress_frontier by progress.upper.
496 let mut new_frontier = Antichain::new();
497 for time1 in progress.upper {
498 for time2 in progress_frontier.elements() {
499 new_frontier.insert(time1.join(time2));
500 }
501 }
502 progress_frontier = new_frontier;
503 }
504
505 // Determine if the lower bound of frontiers have advanced, and transmit updates if so.
506 let mut lower_bound = progress_frontier.clone();
507 lower_bound.extend(updates_frontier.frontier().iter().cloned());
508 if lower_bound != reported_frontier {
509 let capability =
510 capability.expect("Changes occurred, without surfacing a capability");
511 let mut changes = ChangeBatch::new();
512 changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1)));
513 changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1)));
514 let mut frontier_session = frontier.session(&capability);
515 for peer in 0..workers {
516 frontier_session.give((peer, changes.clone()));
517 }
518 reported_frontier = lower_bound.clone();
519 }
520 }
521 });
522
523 // Step 4: The FEEDBACK operator.
524 let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope);
525 let mut input = feedback_op.new_input(
526 frontier,
527 Exchange::new(|x: &(usize, ChangeBatch<T>)| x.0 as u64),
528 );
529 feedback_op.build(move |_capability| {
530 // Receive frontier changes and share the net result with MESSAGES.
531 move |_frontiers| {
532 let mut antichain = shared_frontier2.borrow_mut();
533 let mut must_activate = false;
534 input.for_each(|_cap, frontier_changes| {
535 for (_self, input_changes) in frontier_changes.iter() {
536 // Apply the updates, and observe if the lower bound has changed.
537 if antichain.update_iter(input_changes.unstable_internal_updates().iter().cloned()).next().is_some() {
538 must_activate = true;
539 }
540 }
541 });
542 // If the lower bound has changed, we must activate MESSAGES.
543 if must_activate { activator2.activate(); }
544 }
545 });
546
547 (Box::new(token.unwrap()), changes)
548 }
549}
550
551/// Methods for recording update streams to binary bundles.
552pub mod sink {
553
554 use std::hash::Hash;
555 use std::cell::RefCell;
556 use std::rc::Weak;
557
558 use serde::{Deserialize, Serialize};
559
560 use timely::order::PartialOrder;
561 use timely::progress::{Antichain, ChangeBatch, Timestamp};
562 use timely::dataflow::Stream;
563 use timely::dataflow::channels::pact::{Exchange, Pipeline};
564 use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder};
565
566 use crate::{lattice::Lattice, ExchangeData};
567 use super::{Writer, Message, Progress};
568
569 /// Constructs a sink, for recording the updates in `stream`.
570 ///
571 /// It is crucial that `stream` has been consolidated before this method, which
572 /// will *not* perform the consolidation on the stream's behalf. If this is not
573 /// performed before calling the method, the recorded output may not be correctly
574 /// reconstructed by readers.
575 pub fn build<BS, D, T, R>(
576 stream: Stream<T, Vec<(D, T, R)>>,
577 sink_hash: u64,
578 updates_sink: Weak<RefCell<BS>>,
579 progress_sink: Weak<RefCell<BS>>,
580 ) where
581 BS: Writer<Message<D,T,R>> + 'static,
582 D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
583 T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
584 R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
585 {
586 // First we record the updates that stream in.
587 // We can simply record all updates, under the presumption that the have been consolidated
588 // and so any record we see is in fact guaranteed to happen.
589 let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
590 let reactivator = stream.scope().activator_for(builder.operator_info().address);
591 let mut input = builder.new_input(stream.clone(), Pipeline);
592 let (updates_out, updates) = builder.new_output();
593 let mut updates_out = OutputBuilder::from(updates_out);
594
595 builder.build_reschedule(
596 move |_capability| {
597 let mut timestamps = <ChangeBatch<_>>::new();
598 let mut send_queue = std::collections::VecDeque::new();
599 move |_frontiers| {
600 let mut output = updates_out.activate();
601
602 // We want to drain inputs always...
603 input.for_each(|capability, updates| {
604 // Write each update out, and record the timestamp.
605 for (_data, time, _diff) in updates.iter() {
606 timestamps.update(time.clone(), 1);
607 }
608
609 // Now record the update to the writer.
610 send_queue.push_back(Message::Updates(std::mem::take(updates)));
611
612 // Transmit timestamp counts downstream.
613 output
614 .session(&capability)
615 .give_iterator(timestamps.drain());
616 });
617
618 // Drain whatever we can from the queue of bytes to send.
619 // ... but needn't do anything more if our sink is closed.
620 if let Some(sink) = updates_sink.upgrade() {
621 let mut sink = sink.borrow_mut();
622 while let Some(message) = send_queue.front() {
623 if let Some(duration) = sink.poll(message) {
624 // Reschedule after `duration` and then bail.
625 reactivator.activate_after(duration);
626 return true;
627 } else {
628 send_queue.pop_front();
629 }
630 }
631 // Signal incompleteness if messages remain to be sent.
632 !sink.done() || !send_queue.is_empty()
633 } else {
634 // We have been terminated, but may still receive indefinite data.
635 send_queue.clear();
636 // Signal that there are no outstanding writes.
637 false
638 }
639 }
640 },
641 );
642
643 // We use a lower-level builder here to get access to the operator address, for rescheduling.
644 let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
645 let reactivator = stream.scope().activator_for(builder.operator_info().address);
646 let mut input = builder.new_input(updates, Exchange::new(move |_| sink_hash));
647 let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();
648
649 // We now record the numbers of updates at each timestamp between lower and upper bounds.
650 // Track the advancing frontier, to know when to produce utterances.
651 let mut frontier = Antichain::from_elem(T::minimum());
652 // Track accumulated counts for timestamps.
653 let mut timestamps = <ChangeBatch<_>>::new();
654 // Stash for serialized data yet to send.
655 let mut send_queue = std::collections::VecDeque::new();
656 let mut retain = Vec::new();
657
658 builder.build_reschedule(|_capabilities| {
659 move |frontiers| {
660
661 // We want to drain inputs no matter what.
662 // We could do this after the next step, as we are certain these timestamps will
663 // not be part of a closed frontier (as they have not yet been read). This has the
664 // potential to make things speedier as we scan less and keep a smaller footprint.
665 input.for_each(|_capability, counts| {
666 timestamps.extend(counts.iter().cloned());
667 });
668
669 if should_write {
670 if let Some(sink) = progress_sink.upgrade() {
671 let mut sink = sink.borrow_mut();
672
673 // If our frontier advances strictly, we have the opportunity to issue a progress statement.
674 if <_ as PartialOrder>::less_than(
675 &frontier.borrow(),
676 &frontiers[0].frontier(),
677 ) {
678 let new_frontier = frontiers[0].frontier();
679
680 // Extract the timestamp counts to announce.
681 let mut announce = Vec::new();
682 for (time, count) in timestamps.drain() {
683 if !new_frontier.less_equal(&time) {
684 announce.push((time, count as usize));
685 } else {
686 retain.push((time, count));
687 }
688 }
689 timestamps.extend(retain.drain(..));
690
691 // Announce the lower bound, upper bound, and timestamp counts.
692 let progress = Progress {
693 lower: frontier.elements().to_vec(),
694 upper: new_frontier.to_vec(),
695 counts: announce,
696 };
697 send_queue.push_back(Message::Progress(progress));
698
699 // Advance our frontier to track our progress utterance.
700 frontier = frontiers[0].frontier().to_owned();
701
702 while let Some(message) = send_queue.front() {
703 if let Some(duration) = sink.poll(message) {
704 // Reschedule after `duration` and then bail.
705 reactivator.activate_after(duration);
706 // Signal that work remains to be done.
707 return true;
708 } else {
709 send_queue.pop_front();
710 }
711 }
712 }
713 // Signal incompleteness if messages remain to be sent.
714 !sink.done() || !send_queue.is_empty()
715 } else {
716 timestamps.clear();
717 send_queue.clear();
718 // Signal that there are no outstanding writes.
719 false
720 }
721 } else { false }
722 }
723 });
724 }
725}
726
727// pub mod kafka {
728
729// use serde::{Serialize, Deserialize};
730// use timely::scheduling::SyncActivator;
731// use rdkafka::{ClientContext, config::ClientConfig};
732// use rdkafka::consumer::{BaseConsumer, ConsumerContext};
733// use rdkafka::error::{KafkaError, RDKafkaError};
734// use super::BytesSink;
735
736// use std::hash::Hash;
737// use timely::progress::Timestamp;
738// use timely::dataflow::{Scope, Stream};
739// use crate::ExchangeData;
740// use crate::lattice::Lattice;
741
742// /// Creates a Kafka source from supplied configuration information.
743// pub fn create_source<T, D, T, R>(scope: T, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any>, Stream<T, Vec<(D, T, R)>>)
744// where
745// T: Scope<Timestamp = T>,
746// D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
747// T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice,
748// R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
749// {
750// super::source::build(scope, |activator| {
751// let source = KafkaSource::new(addr, topic, group, activator);
752// // An iterator combinator that yields every "duration" even if more items exist.
753// // The implementation of such an iterator exists in the git history, or can be rewritten easily.
754// super::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
755// })
756// }
757
758// pub fn create_sink<T, D, T, R>(stream: &Stream<T, Vec<(D, T, R)>>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
759// where
760// T: Scope<Timestamp = T>,
761// D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
762// T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
763// R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
764// {
765// use std::rc::Rc;
766// use std::cell::RefCell;
767// use crate::hashable::Hashable;
768
769// let sink = KafkaSink::new(addr, topic);
770// let result = Rc::new(RefCell::new(sink));
771// let sink_hash = (addr.to_string(), topic.to_string()).hashed();
772// super::sink::build(
773// &stream,
774// sink_hash,
775// Rc::downgrade(&result),
776// Rc::downgrade(&result),
777// );
778// Box::new(result)
779
780// }
781
782// pub struct KafkaSource {
783// consumer: BaseConsumer<ActivationConsumerContext>,
784// }
785
786// impl KafkaSource {
787// pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self {
788// let mut kafka_config = ClientConfig::new();
789// // This is mostly cargo-cult'd in from `source/kafka.rs`.
790// kafka_config.set("bootstrap.servers", &addr.to_string());
791// kafka_config
792// .set("enable.auto.commit", "false")
793// .set("auto.offset.reset", "earliest");
794
795// kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); // 30 seconds
796// kafka_config.set("fetch.message.max.bytes", "134217728");
797// kafka_config.set("group.id", group);
798// kafka_config.set("isolation.level", "read_committed");
799// let activator = ActivationConsumerContext(activator);
800// let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap();
801// use rdkafka::consumer::Consumer;
802// consumer.subscribe(&[topic]).unwrap();
803// Self {
804// consumer,
805// }
806// }
807// }
808
809// pub struct Iter<D, T, R> {
810// pub source: KafkaSource,
811// phantom: std::marker::PhantomData<(D, T, R)>,
812// }
813
814// impl<D, T, R> Iter<D, T, R> {
815// /// Constructs a new iterator from a bytes source.
816// pub fn new_from(source: KafkaSource) -> Self {
817// Self {
818// source,
819// phantom: std::marker::PhantomData,
820// }
821// }
822// }
823
824// impl<D, T, R> Iterator for Iter<D, T, R>
825// where
826// D: for<'a>Deserialize<'a>,
827// T: for<'a>Deserialize<'a>,
828// R: for<'a>Deserialize<'a>,
829// {
830// type Item = super::Message<D, T, R>;
831// fn next(&mut self) -> Option<Self::Item> {
832// use rdkafka::message::Message;
833// self.source
834// .consumer
835// .poll(std::time::Duration::from_millis(0))
836// .and_then(|result| result.ok())
837// .and_then(|message| {
838// message.payload().and_then(|message| bincode::deserialize::<super::Message<D, T, R>>(message).ok())
839// })
840// }
841// }
842
843// /// An implementation of [`ConsumerContext`] that unparks the wrapped thread
844// /// when the message queue switches from nonempty to empty.
845// struct ActivationConsumerContext(SyncActivator);
846
847// impl ClientContext for ActivationConsumerContext { }
848
849// impl ActivationConsumerContext {
850// fn activate(&self) {
851// self.0.activate().unwrap();
852// }
853// }
854
855// impl ConsumerContext for ActivationConsumerContext {
856// fn message_queue_nonempty_callback(&self) {
857// self.activate();
858// }
859// }
860
861// use std::time::Duration;
862// use rdkafka::producer::DefaultProducerContext;
863// use rdkafka::producer::{BaseRecord, ThreadedProducer};
864
865// pub struct KafkaSink {
866// topic: String,
867// producer: ThreadedProducer<DefaultProducerContext>,
868// }
869
870// impl KafkaSink {
871// pub fn new(addr: &str, topic: &str) -> Self {
872// let mut config = ClientConfig::new();
873// config.set("bootstrap.servers", &addr);
874// config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20));
875// config.set("queue.buffering.max.messages", &format!("{}", 10_000_000));
876// config.set("queue.buffering.max.ms", &format!("{}", 10));
877// let producer = config
878// .create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext)
879// .expect("creating kafka producer for kafka sinks failed");
880// Self {
881// producer,
882// topic: topic.to_string(),
883// }
884// }
885// }
886
887// impl BytesSink for KafkaSink {
888// fn poll(&mut self, bytes: &[u8]) -> Option<Duration> {
889// let record = BaseRecord::<[u8], _>::to(&self.topic).payload(bytes);
890
891// self.producer.send(record).err().map(|(e, _)| {
892// if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e {
893// Duration::from_secs(1)
894// } else {
895// // TODO(frank): report this error upwards so the user knows the sink is dead.
896// Duration::from_secs(1)
897// }
898// })
899// }
900// fn done(&self) -> bool {
901// self.producer.in_flight_count() == 0
902// }
903// }
904
905// }