1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
//! Methods to construct flow-controlled sources. use crate::Data; use crate::order::{PartialOrder, TotalOrder}; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::operators::probe::Handle; use crate::dataflow::{Stream, Scope}; /// Output of the input reading function for iterator_source. pub struct IteratorSourceInput<T: Clone, D: Data, DI: IntoIterator<Item=D>, I: IntoIterator<Item=(T, DI)>> { /// Lower bound on timestamps that can be emitted by this input in the future. pub lower_bound: T, /// Any `T: IntoIterator` of new input data in the form (time, data): time must be /// monotonically increasing. pub data: I, /// A timestamp that represents the frontier that the probe should have /// reached before the function is invoked again to ingest additional input. pub target: T, } /// Construct a source that repeatedly calls the provided function to ingest input. /// - The function can return None to signal the end of the input; /// - otherwise, it should return a `IteratorSourceInput`, where: /// * `lower_bound` is a lower bound on timestamps that can be emitted by this input in the future, /// `Default::default()` can be used if this isn't needed (the source will assume that /// the timestamps in `data` are monotonically increasing and will release capabilities /// accordingly); /// * `data` is any `T: IntoIterator` of new input data in the form (time, data): time must be /// monotonically increasing; /// * `target` is a timestamp that represents the frontier that the probe should have /// reached before the function is invoked again to ingest additional input. /// The function will receive the current lower bound of timestamps that can be inserted, /// `lower_bound`. /// /// # Example /// ```rust /// extern crate timely; /// /// use timely::dataflow::operators::flow_controlled::{iterator_source, IteratorSourceInput}; /// use timely::dataflow::operators::{probe, Probe, Inspect}; /// /// fn main() { /// timely::execute_from_args(std::env::args(), |worker| { /// let mut input = (0u64..100000).peekable(); /// worker.dataflow(|scope| { /// let mut probe_handle = probe::Handle::new(); /// let probe_handle_2 = probe_handle.clone(); /// /// let mut next_t: u64 = 0; /// iterator_source( /// scope, /// "Source", /// move |prev_t| { /// if let Some(first_x) = input.peek().cloned() { /// next_t = first_x / 100 * 100; /// Some(IteratorSourceInput { /// lower_bound: Default::default(), /// data: vec![ /// (next_t, /// input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())], /// target: *prev_t, /// }) /// } else { /// None /// } /// }, /// probe_handle_2) /// .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d)) /// .probe_with(&mut probe_handle); /// }); /// }).unwrap(); /// } /// ``` pub fn iterator_source< G: Scope, D: Data, DI: IntoIterator<Item=D>, I: IntoIterator<Item=(G::Timestamp, DI)>, F: FnMut(&G::Timestamp)->Option<IteratorSourceInput<G::Timestamp, D, DI, I>>+'static>( scope: &G, name: &str, mut input_f: F, probe: Handle<G::Timestamp>, ) -> Stream<G, D> where G::Timestamp: TotalOrder { let mut target = Default::default(); source(scope, name, |cap, info| { let mut cap = Some(cap); let activator = scope.activator_for(&info.address[..]); move |output| { cap = cap.take().and_then(|mut cap| { loop { if !probe.less_than(&target) { if let Some(IteratorSourceInput { lower_bound, data, target: new_target, }) = input_f(cap.time()) { target = new_target; let mut has_data = false; for (t, ds) in data.into_iter() { cap = if cap.time() != &t { cap.delayed(&t) } else { cap }; let mut session = output.session(&cap); session.give_iterator(ds.into_iter()); has_data = true; } cap = if cap.time().less_than(&lower_bound) { cap.delayed(&lower_bound) } else { cap }; if !has_data { break Some(cap); } } else { break None; } } else { break Some(cap); } } }); if cap.is_some() { activator.activate(); } } }) }