timely_util/
generators.rs

1/*
2    Useful generator patterns for source data
3*/
4
5use super::{Duration, Scope, Stream, SystemTime};
6use crate::util::time_util;
7
8use std::cmp;
9use timely::dataflow::operators::generic::operator::source;
10
11/*
12    Data source which produces a number of output items
13    over time given by an arbitrary function giving the cumulative
14    total to produce, and stopping after the uptime is complete.
15
16    For performance reasons, caps outputs at a given point in time
17    by MAX_OUTPUT. See this note from
18    https://github.com/TimelyDataflow/timely-dataflow
19    > At the moment, the implementations of unary and binary operators
20    > allow their closures to send un-bounded amounts of output. This
21    > can cause unwelcome resource exhaustion, and poor performance
22    > generally if the runtime needs to allocate lots of new memory to
23    > buffer data sent in bulk without being given a chance to digest
24    > it. It is commonly the case that when large amounts of data are
25    > produced, they are eventually reduced given the opportunity.
26*/
27const MAX_OUTPUT: u128 = 1000;
28fn variable_rate_source<D, F, G, H>(
29    mut item_gen: F,
30    scope: &G,
31    cumulative_total_fun: H,
32    uptime: Duration,
33) -> Stream<G, D>
34where
35    D: timely::Data + timely::ExchangeData,
36    F: FnMut(u128) -> D + 'static, // Input: timestamp in nanoseconds
37    G: Scope<Timestamp = u128>,
38    H: Fn(Duration) -> u128 + 'static, // Input: time sicne source start time
39{
40    source(scope, "Source", |capability, info| {
41        // Internal details of timely dataflow
42        // 1. Acquire a re-activator for this operator.
43        // 2. Wrap capability in an option so that we can release it when
44        //    done by setting it to None
45        let activator = scope.activator_for(&info.address[..]);
46        let mut maybe_cap = Some(capability);
47
48        // Initialize with current time; keep track of # vals sent
49        let start_time = SystemTime::now();
50        let mut vals_sent = 0;
51        let vals_max = cumulative_total_fun(uptime);
52
53        // Return closure
54        move |output| {
55            if let Some(cap) = maybe_cap.as_mut() {
56                // Decide how behind we are on outputting values
57                let elapsed = time_util::time_since(start_time);
58                let vals_to_send = cmp::min(
59                    cumulative_total_fun(elapsed),
60                    vals_sent + MAX_OUTPUT,
61                );
62
63                // For debugging (this is nice because it shows if the system is
64                // under full load -- in this case # of new values will usually
65                // be throttled at MAX_OUTPUT)
66                // println!("New items to send: {}", vals_to_send - vals_sent);
67
68                // Output values to catch up
69                let time_nanos = time_util::nanos_timestamp(SystemTime::now());
70                cap.downgrade(&time_nanos);
71                while vals_sent < vals_to_send && vals_sent < vals_max {
72                    let item = item_gen(time_nanos);
73                    output.session(&cap).give(item);
74                    vals_sent += 1;
75                }
76                if vals_sent == vals_max {
77                    maybe_cap = None;
78                } else {
79                    activator.activate();
80                }
81            }
82        }
83    })
84}
85
86/*
87    Data source which produces output items at a constant rate,
88    stopping after the uptime is complete
89*/
90pub fn fixed_rate_source<D, F, G>(
91    item_gen: F,
92    scope: &G,
93    frequency: Duration,
94    uptime: Duration,
95) -> Stream<G, D>
96where
97    D: timely::Data + timely::ExchangeData,
98    F: FnMut(u128) -> D + 'static,
99    G: Scope<Timestamp = u128>,
100{
101    variable_rate_source(
102        item_gen,
103        scope,
104        move |elapsed| time_util::div_durations(elapsed, frequency),
105        uptime,
106    )
107}
108
109/*
110    Data source which produces output items at a linearly increasing rate,
111    stopping after the uptime is complete
112*/
113pub fn linear_rate_source<D, F, G>(
114    item_gen: F,
115    scope: &G,
116    frequency_init: Duration,
117    acceleration: f64, // output events / microsecond^2
118    uptime: Duration,
119) -> Stream<G, D>
120where
121    D: timely::Data + timely::ExchangeData,
122    F: FnMut(u128) -> D + 'static,
123    G: Scope<Timestamp = u128>,
124{
125    variable_rate_source(
126        item_gen,
127        scope,
128        move |elapsed| {
129            // Integral from 0 to T of (1 / f + a * t)
130            //     = T / f + (a / 2) * T^2
131            let micros_elapsed = elapsed.as_micros() as f64;
132            time_util::div_durations(elapsed, frequency_init)
133                + (((acceleration / 2.0) * micros_elapsed * micros_elapsed)
134                    as u128)
135        },
136        uptime,
137    )
138}