palimpsest_dataflow/capture.rs
1//! Logic related to capture and replay of differential collections.
2//!
3//! This module defines a protocol for capturing and replaying differential collections
4//! to streaming storage that may both duplicate and reorder messages. It records facts
5//! about the collection that once true stay true, such as the exact changes data undergo
6//! at each time, and the number of distinct updates at each time.
7//!
8//! The methods are parameterized by implementors of byte sources and byte sinks. For
9//! example implementations of these traits, consult the commented text at the end of
10//! this file.
11
12use serde::{Deserialize, Serialize};
13use std::time::Duration;
14
15/// A message in the CDC V2 protocol.
16#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
17pub enum Message<D, T, R> {
18 /// A batch of updates that are certain to occur.
19 ///
20 /// Each triple is an irrevocable statement about a change that occurs.
21 /// Each statement contains a datum, a time, and a difference, and asserts
22 /// that the multiplicity of the datum changes at the time by the difference.
23 Updates(Vec<(D, T, R)>),
24 /// An irrevocable statement about the number of updates within a time interval.
25 Progress(Progress<T>),
26}
27
28/// An irrevocable statement about the number of updates at times within an interval.
29///
30/// This statement covers all times beyond `lower` and not beyond `upper`.
31/// Each element of `counts` is an irrevocable statement about the exact number of
32/// distinct updates that occur at that time.
33/// Times not present in `counts` have a count of zero.
34#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
35pub struct Progress<T> {
36 /// The lower bound of times contained in this statement.
37 pub lower: Vec<T>,
38 /// The upper bound of times contained in this statement.
39 pub upper: Vec<T>,
40 /// All non-zero counts for times beyond `lower` and not beyond `upper`.
41 pub counts: Vec<(T, usize)>,
42}
43
44/// A simple sink for byte slices.
45pub trait Writer<T> {
46 /// Returns an amount of time to wait before retrying, or `None` for success.
47 fn poll(&mut self, item: &T) -> Option<Duration>;
48 /// Indicates if the sink has committed all sent data and can be safely dropped.
49 fn done(&self) -> bool;
50}
51
52/// A deduplicating, re-ordering iterator.
53pub mod iterator {
54
55 use super::{Message, Progress};
56 use crate::lattice::Lattice;
57 use std::hash::Hash;
58 use timely::order::PartialOrder;
59 use timely::progress::{
60 frontier::{AntichainRef, MutableAntichain},
61 Antichain, Timestamp,
62 };
63
64 /// A direct implementation of a deduplicating, re-ordering iterator.
65 ///
66 /// The iterator draws from a source that may have arbitrary duplication, be arbitrarily out of order,
67 /// and yet produces each update once, with in-order batches. The iterator maintains a bounded memory
68 /// footprint, proportional to the mismatch between the received updates and progress messages.
69 pub struct Iter<I, D, T, R>
70 where
71 I: Iterator<Item = Message<D, T, R>>,
72 T: Hash + Ord + Lattice + Clone,
73 D: Hash + Eq,
74 T: Hash + Eq,
75 R: Hash + Eq,
76 {
77 /// Source of potentially duplicated, out of order cdc_v2 messages.
78 iterator: I,
79 /// Updates that have been received, but are still beyond `reported_frontier`.
80 ///
81 /// These updates are retained both so that they can eventually be transmitted,
82 /// but also so that they can deduplicate updates that may still be received.
83 updates: std::collections::HashSet<(D, T, R)>,
84 /// Frontier through which the iterator has reported updates.
85 ///
86 /// All updates not beyond this frontier have been reported.
87 /// Any information related to times not beyond this frontier can be discarded.
88 ///
89 /// This frontier tracks the meet of `progress_frontier` and `messages_frontier`,
90 /// our two bounds on potential uncertainty in progress and update messages.
91 reported_frontier: Antichain<T>,
92 /// Frontier of accepted progress statements.
93 ///
94 /// All progress message counts for times not beyond this frontier have been
95 /// incorporated in to `messages_frontier`. This frontier also guides which
96 /// received progress statements can be incorporated: those whose for which
97 /// this frontier is beyond their lower bound.
98 progress_frontier: Antichain<T>,
99 /// Counts of outstanding messages at times.
100 ///
101 /// These counts track the difference between message counts at times announced
102 /// by progress messages, and message counts at times received in distinct updates.
103 messages_frontier: MutableAntichain<T>,
104 /// Progress statements that are not yet actionable due to out-of-Iterness.
105 ///
106 /// A progress statement becomes actionable once the progress frontier is beyond
107 /// its lower frontier. This ensures that the [0, lower) interval is already
108 /// incorporated, and that we will not leave a gap by incorporating the counts
109 /// and reflecting the progress statement's upper frontier.
110 progress_queue: Vec<Progress<T>>,
111 }
112
113 impl<D, T, R, I> Iterator for Iter<I, D, T, R>
114 where
115 I: Iterator<Item = Message<D, T, R>>,
116 T: Hash + Ord + Lattice + Clone,
117 D: Hash + Eq + Clone,
118 R: Hash + Eq + Clone,
119 {
120 type Item = (Vec<(D, T, R)>, Antichain<T>);
121 fn next(&mut self) -> Option<Self::Item> {
122 // Each call to `next` should return some newly carved interval of time.
123 // As such, we should read from our source until we find such a thing.
124 //
125 // An interval can be completed once our frontier of received progress
126 // information and our frontier of unresolved counts have advanced.
127 while let Some(message) = self.iterator.next() {
128 match message {
129 Message::Updates(mut updates) => {
130 // Discard updates at reported times, or duplicates at unreported times.
131 updates.retain(|dtr| {
132 self.reported_frontier.less_equal(&dtr.1) && !self.updates.contains(dtr)
133 });
134 // Decrement our counts of accounted-for messages.
135 self.messages_frontier
136 .update_iter(updates.iter().map(|(_, t, _)| (t.clone(), -1)));
137 // Record the messages in our de-duplication collection.
138 self.updates.extend(updates.into_iter());
139 }
140 Message::Progress(progress) => {
141 // A progress statement may not be immediately actionable.
142 self.progress_queue.push(progress);
143 }
144 }
145
146 // Attempt to drain actionable progress messages.
147 // A progress message is actionable if `self.progress_frontier` is greater or
148 // equal to the message's lower bound.
149 while let Some(position) = self.progress_queue.iter().position(|p| {
150 <_ as PartialOrder>::less_equal(
151 &AntichainRef::new(&p.lower),
152 &self.progress_frontier.borrow(),
153 )
154 }) {
155 let mut progress = self.progress_queue.remove(position);
156 // Discard counts that have already been incorporated.
157 progress
158 .counts
159 .retain(|(time, _count)| self.progress_frontier.less_equal(time));
160 // Record any new reports of expected counts.
161 self.messages_frontier
162 .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
163 // Extend the frontier to be times greater or equal to both progress.upper and self.progress_frontier.
164 let mut new_frontier = Antichain::new();
165 for time1 in progress.upper {
166 for time2 in self.progress_frontier.elements() {
167 new_frontier.insert(time1.join(time2));
168 }
169 }
170 self.progress_queue.retain(|p| {
171 !<_ as PartialOrder>::less_equal(
172 &AntichainRef::new(&p.upper),
173 &new_frontier.borrow(),
174 )
175 });
176 self.progress_frontier = new_frontier;
177 }
178
179 // Now check and see if our lower bound exceeds `self.reported_frontier`.
180 let mut lower_bound = self.progress_frontier.clone();
181 lower_bound.extend(self.messages_frontier.frontier().iter().cloned());
182 if lower_bound != self.reported_frontier {
183 let to_publish = self
184 .updates
185 .iter()
186 .filter(|(_, t, _)| !lower_bound.less_equal(t))
187 .cloned()
188 .collect::<Vec<_>>();
189 self.updates.retain(|(_, t, _)| lower_bound.less_equal(t));
190 self.reported_frontier = lower_bound.clone();
191 return Some((to_publish, lower_bound));
192 }
193 }
194 None
195 }
196 }
197
198 impl<D, T, R, I> Iter<I, D, T, R>
199 where
200 I: Iterator<Item = Message<D, T, R>>,
201 T: Hash + Ord + Lattice + Clone + Timestamp,
202 D: Hash + Eq + Clone,
203 R: Hash + Eq + Clone,
204 {
205 /// Construct a new re-ordering, deduplicating iterator.
206 pub fn new(iterator: I) -> Self {
207 Self {
208 iterator,
209 updates: std::collections::HashSet::new(),
210 reported_frontier: Antichain::from_elem(T::minimum()),
211 progress_frontier: Antichain::from_elem(T::minimum()),
212 messages_frontier: MutableAntichain::new(),
213 progress_queue: Vec::new(),
214 }
215 }
216 }
217}
218
219/// Methods for recovering update streams from binary bundles.
220pub mod source {
221
222 use super::{Message, Progress};
223 use crate::{lattice::Lattice, ExchangeData};
224 use std::cell::RefCell;
225 use std::hash::Hash;
226 use std::marker::{Send, Sync};
227 use std::rc::Rc;
228 use std::sync::Arc;
229 use timely::dataflow::operators::generic::OutputBuilder;
230 use timely::dataflow::{
231 operators::{Capability, CapabilitySet},
232 Scope, Stream,
233 };
234 use timely::progress::Timestamp;
235 use timely::scheduling::SyncActivator;
236
237 // TODO(guswynn): implement this generally in timely
238 struct DropActivator {
239 activator: Arc<SyncActivator>,
240 }
241
242 impl Drop for DropActivator {
243 fn drop(&mut self) {
244 // Best effort: failure to activate
245 // is ignored
246 let _ = self.activator.activate();
247 }
248 }
249
250 /// Constructs a stream of updates from a source of messages.
251 ///
252 /// The stream is built in the supplied `scope` and continues to run until
253 /// the returned `Box<Any>` token is dropped. The `source_builder` argument
254 /// is invoked with a `SyncActivator` that will re-activate the source.
255 pub fn build<G, B, I, D, T, R>(
256 scope: G,
257 source_builder: B,
258 ) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
259 where
260 G: Scope<Timestamp = T>,
261 B: FnOnce(SyncActivator) -> I,
262 I: Iterator<Item = Message<D, T, R>> + 'static,
263 D: ExchangeData + Hash,
264 T: ExchangeData + Hash + Timestamp + Lattice,
265 R: ExchangeData + Hash,
266 {
267 // Read messages are either updates or progress messages.
268 // Each may contain duplicates, and we must take care to deduplicate information before introducing it to an accumulation.
269 // This includes both emitting updates, and setting expectations for update counts.
270 //
271 // Updates need to be deduplicated by (data, time), and we should exchange them by such.
272 // Progress needs to be deduplicated by time, and we should exchange them by such.
273 //
274 // The first cut of this is a dataflow graph that looks like (flowing downward)
275 //
276 // 1. MESSAGES:
277 // Reads `Message` stream; maintains capabilities.
278 // Sends `Updates` to UPDATES stage by hash((data, time, diff)).
279 // Sends `Progress` to PROGRESS stage by hash(time), each with lower, upper bounds.
280 // Shares capabilities with downstream operator.
281 // 2. UPDATES:
282 // Maintains and deduplicates updates.
283 // Ships updates once frontier advances.
284 // Ships counts to PROGRESS stage, by hash(time).
285 // 3. PROGRESS:
286 // Maintains outstanding message counts by time. Tracks frontiers.
287 // Tracks lower bounds of messages and progress frontier. Broadcasts changes to FEEDBACK stage
288 // 4. FEEDBACK:
289 // Shares capabilities with MESSAGES; downgrades to track input from PROGRESS.
290 //
291 // Each of these stages can be arbitrarily data-parallel, and FEEDBACK *must* have the same parallelism as RAW.
292 // Limitations: MESSAGES must broadcast lower and upper bounds to PROGRESS and PROGRESS must broadcast its changes
293 // to FEEDBACK. This may mean that scaling up PROGRESS could introduce quadratic problems. Though, both of these
294 // broadcast things are meant to be very reduced data.
295
296 use crate::hashable::Hashable;
297 use timely::dataflow::channels::pact::Exchange;
298 use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
299 use timely::progress::frontier::MutableAntichain;
300 use timely::progress::ChangeBatch;
301
302 // Some message distribution logic depends on the number of workers.
303 let workers = scope.peers();
304
305 let mut token = None;
306 // Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
307 let mut antichain = MutableAntichain::new();
308 antichain.update_iter(Some((T::minimum(), workers as i64)));
309 let shared_frontier = Rc::new(RefCell::new(antichain));
310 let shared_frontier2 = shared_frontier.clone();
311
312 // Step 1: The MESSAGES operator.
313 let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
314 let address = messages_op.operator_info().address;
315 let activator = scope.sync_activator_for(address.to_vec());
316 let activator2 = scope.activator_for(Rc::clone(&address));
317 let drop_activator = DropActivator {
318 activator: Arc::new(scope.sync_activator_for(address.to_vec())),
319 };
320 let mut source = source_builder(activator);
321 let (updates_out, updates) = messages_op.new_output();
322 let mut updates_out = OutputBuilder::from(updates_out);
323 let (progress_out, progress) = messages_op.new_output();
324 let mut progress_out = OutputBuilder::from(progress_out);
325
326 messages_op.build(|capabilities| {
327 // A Weak that communicates whether the returned token has been dropped.
328 let drop_activator_weak = Arc::downgrade(&drop_activator.activator);
329
330 token = Some(drop_activator);
331
332 // Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
333 let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
334 let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
335 // Capture the shared frontier to read out frontier updates to apply.
336 let local_frontier = shared_frontier.clone();
337 //
338 move |_frontiers| {
339 // First check to ensure that we haven't been terminated by someone dropping our tokens.
340 if drop_activator_weak.upgrade().is_none() {
341 // Give up our capabilities
342 updates_caps.downgrade(&[]);
343 progress_caps.downgrade(&[]);
344 // never continue, even if we are (erroneously) activated again.
345 return;
346 }
347
348 // Consult our shared frontier, and ensure capabilities are downgraded to it.
349 let shared_frontier = local_frontier.borrow();
350 updates_caps.downgrade(&shared_frontier.frontier());
351 progress_caps.downgrade(&shared_frontier.frontier());
352
353 // Next check to see if we have been terminated by the source being complete.
354 if !updates_caps.is_empty() && !progress_caps.is_empty() {
355 let mut updates = updates_out.activate();
356 let mut progress = progress_out.activate();
357
358 // TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
359 // Specifically, there may not be one capability valid for all updates.
360 let mut updates_session = updates.session(&updates_caps[0]);
361 let mut progress_session = progress.session(&progress_caps[0]);
362
363 // We presume the iterator will yield if appropriate.
364 for message in source.by_ref() {
365 match message {
366 Message::Updates(mut updates) => {
367 updates_session.give_container(&mut updates);
368 }
369 Message::Progress(progress) => {
370 // We must send a copy of each progress message to all workers,
371 // but we can partition the counts across workers by timestamp.
372 let mut to_worker = vec![Vec::new(); workers];
373 for (time, count) in progress.counts {
374 to_worker[(time.hashed() as usize) % workers]
375 .push((time, count));
376 }
377 for (worker, counts) in to_worker.into_iter().enumerate() {
378 progress_session.give((
379 worker,
380 Progress {
381 lower: progress.lower.clone(),
382 upper: progress.upper.clone(),
383 counts,
384 },
385 ));
386 }
387 }
388 }
389 }
390 }
391 }
392 });
393
394 // Step 2: The UPDATES operator.
395 let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone());
396 let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
397 let (changes_out, changes) = updates_op.new_output();
398 let mut changes_out = OutputBuilder::from(changes_out);
399 let (counts_out, counts) = updates_op.new_output();
400 let mut counts_out = OutputBuilder::from(counts_out);
401
402 updates_op.build(move |_capability| {
403 // Deduplicates updates, and ships novel updates and the counts for each time.
404 // For simplicity, this operator ships updates as they are discovered to be new.
405 // This has the defect that on load we may have two copies of the data (shipped,
406 // and here for deduplication).
407 //
408 // Filters may be pushed ahead of this operator, but because of deduplication we
409 // may not push projections ahead of this operator (at least, not without fields
410 // that are known to form keys, and even then only carefully).
411 let mut pending = std::collections::HashMap::new();
412 let mut change_batch = ChangeBatch::<T>::new();
413 move |frontiers| {
414 // Thin out deduplication buffer.
415 // This is the moment in a more advanced implementation where we might send
416 // the data for the first time, maintaining only one copy of each update live
417 // at a time in the system.
418 pending.retain(|(_row, time), _diff| frontiers[0].less_equal(time));
419
420 // Deduplicate newly received updates, sending new updates and timestamp counts.
421 let mut changes = changes_out.activate();
422 let mut counts = counts_out.activate();
423 while let Some((capability, updates)) = input.next() {
424 let mut changes_session = changes.session(&capability);
425 let mut counts_session = counts.session(&capability);
426 for (data, time, diff) in updates.iter() {
427 if frontiers[0].less_equal(time) {
428 if let Some(prior) =
429 pending.insert((data.clone(), time.clone()), diff.clone())
430 {
431 assert_eq!(&prior, diff);
432 } else {
433 change_batch.update(time.clone(), -1);
434 changes_session.give((data.clone(), time.clone(), diff.clone()));
435 }
436 }
437 }
438 if !change_batch.is_empty() {
439 counts_session.give_iterator(change_batch.drain());
440 }
441 }
442 }
443 });
444
445 // Step 3: The PROGRESS operator.
446 let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope.clone());
447 let mut input = progress_op.new_input(
448 &progress,
449 Exchange::new(|x: &(usize, Progress<T>)| x.0 as u64),
450 );
451 let mut counts =
452 progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
453 let (frontier_out, frontier) = progress_op.new_output();
454 let mut frontier_out = OutputBuilder::from(frontier_out);
455 progress_op.build(move |_capability| {
456 // Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.
457
458 use timely::order::PartialOrder;
459 use timely::progress::{frontier::AntichainRef, Antichain};
460
461 let mut progress_queue = Vec::new();
462 let mut progress_frontier = Antichain::from_elem(T::minimum());
463 let mut updates_frontier = MutableAntichain::new();
464 let mut reported_frontier = Antichain::from_elem(T::minimum());
465
466 move |_frontiers| {
467 let mut frontier = frontier_out.activate();
468
469 // If the frontier changes we need a capability to express that.
470 // Any capability should work; the downstream listener doesn't care.
471 let mut capability: Option<Capability<T>> = None;
472
473 // Drain all relevant update counts in to the mutable antichain tracking its frontier.
474 while let Some((cap, counts)) = counts.next() {
475 updates_frontier.update_iter(counts.iter().cloned());
476 capability = Some(cap.retain());
477 }
478 // Drain all progress statements into the queue out of which we will work.
479 while let Some((cap, progress)) = input.next() {
480 progress_queue.extend(progress.iter().map(|x| (x.1).clone()));
481 capability = Some(cap.retain());
482 }
483
484 // Extract and act on actionable progress messages.
485 // A progress message is actionable if `self.progress_frontier` is beyond the message's lower bound.
486 while let Some(position) = progress_queue.iter().position(|p| {
487 <_ as PartialOrder>::less_equal(
488 &AntichainRef::new(&p.lower),
489 &progress_frontier.borrow(),
490 )
491 }) {
492 // Extract progress statement.
493 let mut progress = progress_queue.remove(position);
494 // Discard counts that have already been incorporated.
495 progress
496 .counts
497 .retain(|(time, _count)| progress_frontier.less_equal(time));
498 // Record any new reports of expected counts.
499 updates_frontier
500 .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
501 // Extend self.progress_frontier by progress.upper.
502 let mut new_frontier = Antichain::new();
503 for time1 in progress.upper {
504 for time2 in progress_frontier.elements() {
505 new_frontier.insert(time1.join(time2));
506 }
507 }
508 progress_frontier = new_frontier;
509 }
510
511 // Determine if the lower bound of frontiers have advanced, and transmit updates if so.
512 let mut lower_bound = progress_frontier.clone();
513 lower_bound.extend(updates_frontier.frontier().iter().cloned());
514 if lower_bound != reported_frontier {
515 let capability =
516 capability.expect("Changes occurred, without surfacing a capability");
517 let mut changes = ChangeBatch::new();
518 changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1)));
519 changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1)));
520 let mut frontier_session = frontier.session(&capability);
521 for peer in 0..workers {
522 frontier_session.give((peer, changes.clone()));
523 }
524 reported_frontier = lower_bound.clone();
525 }
526 }
527 });
528
529 // Step 4: The FEEDBACK operator.
530 let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope.clone());
531 let mut input = feedback_op.new_input(
532 &frontier,
533 Exchange::new(|x: &(usize, ChangeBatch<T>)| x.0 as u64),
534 );
535 feedback_op.build(move |_capability| {
536 // Receive frontier changes and share the net result with MESSAGES.
537 move |_frontiers| {
538 let mut antichain = shared_frontier2.borrow_mut();
539 let mut must_activate = false;
540 while let Some((_cap, frontier_changes)) = input.next() {
541 for (_self, input_changes) in frontier_changes.iter() {
542 // Apply the updates, and observe if the lower bound has changed.
543 if antichain
544 .update_iter(input_changes.unstable_internal_updates().iter().cloned())
545 .next()
546 .is_some()
547 {
548 must_activate = true;
549 }
550 }
551 }
552 // If the lower bound has changed, we must activate MESSAGES.
553 if must_activate {
554 activator2.activate();
555 }
556 }
557 });
558
559 (Box::new(token.unwrap()), changes)
560 }
561}
562
563/// Methods for recording update streams to binary bundles.
564pub mod sink {
565
566 use std::cell::RefCell;
567 use std::hash::Hash;
568 use std::rc::Weak;
569
570 use serde::{Deserialize, Serialize};
571
572 use timely::dataflow::channels::pact::{Exchange, Pipeline};
573 use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder};
574 use timely::dataflow::{Scope, Stream};
575 use timely::order::PartialOrder;
576 use timely::progress::{Antichain, ChangeBatch, Timestamp};
577
578 use super::{Message, Progress, Writer};
579 use crate::{lattice::Lattice, ExchangeData};
580
581 /// Constructs a sink, for recording the updates in `stream`.
582 ///
583 /// It is crucial that `stream` has been consolidated before this method, which
584 /// will *not* perform the consolidation on the stream's behalf. If this is not
585 /// performed before calling the method, the recorded output may not be correctly
586 /// reconstructed by readers.
587 pub fn build<G, BS, D, T, R>(
588 stream: &Stream<G, (D, T, R)>,
589 sink_hash: u64,
590 updates_sink: Weak<RefCell<BS>>,
591 progress_sink: Weak<RefCell<BS>>,
592 ) where
593 G: Scope<Timestamp = T>,
594 BS: Writer<Message<D, T, R>> + 'static,
595 D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
596 T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
597 R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
598 {
599 // First we record the updates that stream in.
600 // We can simply record all updates, under the presumption that the have been consolidated
601 // and so any record we see is in fact guaranteed to happen.
602 let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
603 let reactivator = stream
604 .scope()
605 .activator_for(builder.operator_info().address);
606 let mut input = builder.new_input(stream, Pipeline);
607 let (updates_out, updates) = builder.new_output();
608 let mut updates_out = OutputBuilder::from(updates_out);
609
610 builder.build_reschedule(move |_capability| {
611 let mut timestamps = <ChangeBatch<_>>::new();
612 let mut send_queue = std::collections::VecDeque::new();
613 move |_frontiers| {
614 let mut output = updates_out.activate();
615
616 // We want to drain inputs always...
617 input.for_each(|capability, updates| {
618 // Write each update out, and record the timestamp.
619 for (_data, time, _diff) in updates.iter() {
620 timestamps.update(time.clone(), 1);
621 }
622
623 // Now record the update to the writer.
624 send_queue.push_back(Message::Updates(std::mem::take(updates)));
625
626 // Transmit timestamp counts downstream.
627 output
628 .session(&capability)
629 .give_iterator(timestamps.drain());
630 });
631
632 // Drain whatever we can from the queue of bytes to send.
633 // ... but needn't do anything more if our sink is closed.
634 if let Some(sink) = updates_sink.upgrade() {
635 let mut sink = sink.borrow_mut();
636 while let Some(message) = send_queue.front() {
637 if let Some(duration) = sink.poll(message) {
638 // Reschedule after `duration` and then bail.
639 reactivator.activate_after(duration);
640 return true;
641 } else {
642 send_queue.pop_front();
643 }
644 }
645 // Signal incompleteness if messages remain to be sent.
646 !sink.done() || !send_queue.is_empty()
647 } else {
648 // We have been terminated, but may still receive indefinite data.
649 send_queue.clear();
650 // Signal that there are no outstanding writes.
651 false
652 }
653 }
654 });
655
656 // We use a lower-level builder here to get access to the operator address, for rescheduling.
657 let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
658 let reactivator = stream
659 .scope()
660 .activator_for(builder.operator_info().address);
661 let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
662 let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();
663
664 // We now record the numbers of updates at each timestamp between lower and upper bounds.
665 // Track the advancing frontier, to know when to produce utterances.
666 let mut frontier = Antichain::from_elem(T::minimum());
667 // Track accumulated counts for timestamps.
668 let mut timestamps = <ChangeBatch<_>>::new();
669 // Stash for serialized data yet to send.
670 let mut send_queue = std::collections::VecDeque::new();
671 let mut retain = Vec::new();
672
673 builder.build_reschedule(|_capabilities| {
674 move |frontiers| {
675 // We want to drain inputs no matter what.
676 // We could do this after the next step, as we are certain these timestamps will
677 // not be part of a closed frontier (as they have not yet been read). This has the
678 // potential to make things speedier as we scan less and keep a smaller footprint.
679 input.for_each(|_capability, counts| {
680 timestamps.extend(counts.iter().cloned());
681 });
682
683 if should_write {
684 if let Some(sink) = progress_sink.upgrade() {
685 let mut sink = sink.borrow_mut();
686
687 // If our frontier advances strictly, we have the opportunity to issue a progress statement.
688 if <_ as PartialOrder>::less_than(
689 &frontier.borrow(),
690 &frontiers[0].frontier(),
691 ) {
692 let new_frontier = frontiers[0].frontier();
693
694 // Extract the timestamp counts to announce.
695 let mut announce = Vec::new();
696 for (time, count) in timestamps.drain() {
697 if !new_frontier.less_equal(&time) {
698 announce.push((time, count as usize));
699 } else {
700 retain.push((time, count));
701 }
702 }
703 timestamps.extend(retain.drain(..));
704
705 // Announce the lower bound, upper bound, and timestamp counts.
706 let progress = Progress {
707 lower: frontier.elements().to_vec(),
708 upper: new_frontier.to_vec(),
709 counts: announce,
710 };
711 send_queue.push_back(Message::Progress(progress));
712
713 // Advance our frontier to track our progress utterance.
714 frontier = frontiers[0].frontier().to_owned();
715
716 while let Some(message) = send_queue.front() {
717 if let Some(duration) = sink.poll(message) {
718 // Reschedule after `duration` and then bail.
719 reactivator.activate_after(duration);
720 // Signal that work remains to be done.
721 return true;
722 } else {
723 send_queue.pop_front();
724 }
725 }
726 }
727 // Signal incompleteness if messages remain to be sent.
728 !sink.done() || !send_queue.is_empty()
729 } else {
730 timestamps.clear();
731 send_queue.clear();
732 // Signal that there are no outstanding writes.
733 false
734 }
735 } else {
736 false
737 }
738 }
739 });
740 }
741}
742
743// pub mod kafka {
744
745// use serde::{Serialize, Deserialize};
746// use timely::scheduling::SyncActivator;
747// use rdkafka::{ClientContext, config::ClientConfig};
748// use rdkafka::consumer::{BaseConsumer, ConsumerContext};
749// use rdkafka::error::{KafkaError, RDKafkaError};
750// use super::BytesSink;
751
752// use std::hash::Hash;
753// use timely::progress::Timestamp;
754// use timely::dataflow::{Scope, Stream};
755// use crate::ExchangeData;
756// use crate::lattice::Lattice;
757
758// /// Creates a Kafka source from supplied configuration information.
759// pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
760// where
761// G: Scope<Timestamp = T>,
762// D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
763// T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice,
764// R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
765// {
766// super::source::build(scope, |activator| {
767// let source = KafkaSource::new(addr, topic, group, activator);
768// // An iterator combinator that yields every "duration" even if more items exist.
769// // The implementation of such an iterator exists in the git history, or can be rewritten easily.
770// super::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
771// })
772// }
773
774// pub fn create_sink<G, D, T, R>(stream: &Stream<G, (D, T, R)>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
775// where
776// G: Scope<Timestamp = T>,
777// D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
778// T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
779// R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
780// {
781// use std::rc::Rc;
782// use std::cell::RefCell;
783// use crate::hashable::Hashable;
784
785// let sink = KafkaSink::new(addr, topic);
786// let result = Rc::new(RefCell::new(sink));
787// let sink_hash = (addr.to_string(), topic.to_string()).hashed();
788// super::sink::build(
789// &stream,
790// sink_hash,
791// Rc::downgrade(&result),
792// Rc::downgrade(&result),
793// );
794// Box::new(result)
795
796// }
797
798// pub struct KafkaSource {
799// consumer: BaseConsumer<ActivationConsumerContext>,
800// }
801
802// impl KafkaSource {
803// pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self {
804// let mut kafka_config = ClientConfig::new();
805// // This is mostly cargo-cult'd in from `source/kafka.rs`.
806// kafka_config.set("bootstrap.servers", &addr.to_string());
807// kafka_config
808// .set("enable.auto.commit", "false")
809// .set("auto.offset.reset", "earliest");
810
811// kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); // 30 seconds
812// kafka_config.set("fetch.message.max.bytes", "134217728");
813// kafka_config.set("group.id", group);
814// kafka_config.set("isolation.level", "read_committed");
815// let activator = ActivationConsumerContext(activator);
816// let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap();
817// use rdkafka::consumer::Consumer;
818// consumer.subscribe(&[topic]).unwrap();
819// Self {
820// consumer,
821// }
822// }
823// }
824
825// pub struct Iter<D, T, R> {
826// pub source: KafkaSource,
827// phantom: std::marker::PhantomData<(D, T, R)>,
828// }
829
830// impl<D, T, R> Iter<D, T, R> {
831// /// Constructs a new iterator from a bytes source.
832// pub fn new_from(source: KafkaSource) -> Self {
833// Self {
834// source,
835// phantom: std::marker::PhantomData,
836// }
837// }
838// }
839
840// impl<D, T, R> Iterator for Iter<D, T, R>
841// where
842// D: for<'a>Deserialize<'a>,
843// T: for<'a>Deserialize<'a>,
844// R: for<'a>Deserialize<'a>,
845// {
846// type Item = super::Message<D, T, R>;
847// fn next(&mut self) -> Option<Self::Item> {
848// use rdkafka::message::Message;
849// self.source
850// .consumer
851// .poll(std::time::Duration::from_millis(0))
852// .and_then(|result| result.ok())
853// .and_then(|message| {
854// message.payload().and_then(|message| bincode::deserialize::<super::Message<D, T, R>>(message).ok())
855// })
856// }
857// }
858
859// /// An implementation of [`ConsumerContext`] that unparks the wrapped thread
860// /// when the message queue switches from nonempty to empty.
861// struct ActivationConsumerContext(SyncActivator);
862
863// impl ClientContext for ActivationConsumerContext { }
864
865// impl ActivationConsumerContext {
866// fn activate(&self) {
867// self.0.activate().unwrap();
868// }
869// }
870
871// impl ConsumerContext for ActivationConsumerContext {
872// fn message_queue_nonempty_callback(&self) {
873// self.activate();
874// }
875// }
876
877// use std::time::Duration;
878// use rdkafka::producer::DefaultProducerContext;
879// use rdkafka::producer::{BaseRecord, ThreadedProducer};
880
881// pub struct KafkaSink {
882// topic: String,
883// producer: ThreadedProducer<DefaultProducerContext>,
884// }
885
886// impl KafkaSink {
887// pub fn new(addr: &str, topic: &str) -> Self {
888// let mut config = ClientConfig::new();
889// config.set("bootstrap.servers", &addr);
890// config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20));
891// config.set("queue.buffering.max.messages", &format!("{}", 10_000_000));
892// config.set("queue.buffering.max.ms", &format!("{}", 10));
893// let producer = config
894// .create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext)
895// .expect("creating kafka producer for kafka sinks failed");
896// Self {
897// producer,
898// topic: topic.to_string(),
899// }
900// }
901// }
902
903// impl BytesSink for KafkaSink {
904// fn poll(&mut self, bytes: &[u8]) -> Option<Duration> {
905// let record = BaseRecord::<[u8], _>::to(&self.topic).payload(bytes);
906
907// self.producer.send(record).err().map(|(e, _)| {
908// if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e {
909// Duration::from_secs(1)
910// } else {
911// // TODO(frank): report this error upwards so the user knows the sink is dead.
912// Duration::from_secs(1)
913// }
914// })
915// }
916// fn done(&self) -> bool {
917// self.producer.in_flight_count() == 0
918// }
919// }
920
921// }