timely_util/generators.rs
1/*
2 Useful generator patterns for source data
3*/
4
5use super::{Duration, Scope, Stream, SystemTime};
6use crate::util::time_util;
7
8use std::cmp;
9use timely::dataflow::operators::generic::operator::source;
10
11/*
12 Data source which produces a number of output items
13 over time given by an arbitrary function giving the cumulative
14 total to produce, and stopping after the uptime is complete.
15
16 For performance reasons, caps outputs at a given point in time
17 by MAX_OUTPUT. See this note from
18 https://github.com/TimelyDataflow/timely-dataflow
19 > At the moment, the implementations of unary and binary operators
20 > allow their closures to send un-bounded amounts of output. This
21 > can cause unwelcome resource exhaustion, and poor performance
22 > generally if the runtime needs to allocate lots of new memory to
23 > buffer data sent in bulk without being given a chance to digest
24 > it. It is commonly the case that when large amounts of data are
25 > produced, they are eventually reduced given the opportunity.
26*/
27const MAX_OUTPUT: u128 = 1000;
28fn variable_rate_source<D, F, G, H>(
29 mut item_gen: F,
30 scope: &G,
31 cumulative_total_fun: H,
32 uptime: Duration,
33) -> Stream<G, D>
34where
35 D: timely::Data + timely::ExchangeData,
36 F: FnMut(u128) -> D + 'static, // Input: timestamp in nanoseconds
37 G: Scope<Timestamp = u128>,
38 H: Fn(Duration) -> u128 + 'static, // Input: time sicne source start time
39{
40 source(scope, "Source", |capability, info| {
41 // Internal details of timely dataflow
42 // 1. Acquire a re-activator for this operator.
43 // 2. Wrap capability in an option so that we can release it when
44 // done by setting it to None
45 let activator = scope.activator_for(&info.address[..]);
46 let mut maybe_cap = Some(capability);
47
48 // Initialize with current time; keep track of # vals sent
49 let start_time = SystemTime::now();
50 let mut vals_sent = 0;
51 let vals_max = cumulative_total_fun(uptime);
52
53 // Return closure
54 move |output| {
55 if let Some(cap) = maybe_cap.as_mut() {
56 // Decide how behind we are on outputting values
57 let elapsed = time_util::time_since(start_time);
58 let vals_to_send = cmp::min(
59 cumulative_total_fun(elapsed),
60 vals_sent + MAX_OUTPUT,
61 );
62
63 // For debugging (this is nice because it shows if the system is
64 // under full load -- in this case # of new values will usually
65 // be throttled at MAX_OUTPUT)
66 // println!("New items to send: {}", vals_to_send - vals_sent);
67
68 // Output values to catch up
69 let time_nanos = time_util::nanos_timestamp(SystemTime::now());
70 cap.downgrade(&time_nanos);
71 while vals_sent < vals_to_send && vals_sent < vals_max {
72 let item = item_gen(time_nanos);
73 output.session(&cap).give(item);
74 vals_sent += 1;
75 }
76 if vals_sent == vals_max {
77 maybe_cap = None;
78 } else {
79 activator.activate();
80 }
81 }
82 }
83 })
84}
85
86/*
87 Data source which produces output items at a constant rate,
88 stopping after the uptime is complete
89*/
90pub fn fixed_rate_source<D, F, G>(
91 item_gen: F,
92 scope: &G,
93 frequency: Duration,
94 uptime: Duration,
95) -> Stream<G, D>
96where
97 D: timely::Data + timely::ExchangeData,
98 F: FnMut(u128) -> D + 'static,
99 G: Scope<Timestamp = u128>,
100{
101 variable_rate_source(
102 item_gen,
103 scope,
104 move |elapsed| time_util::div_durations(elapsed, frequency),
105 uptime,
106 )
107}
108
109/*
110 Data source which produces output items at a linearly increasing rate,
111 stopping after the uptime is complete
112*/
113pub fn linear_rate_source<D, F, G>(
114 item_gen: F,
115 scope: &G,
116 frequency_init: Duration,
117 acceleration: f64, // output events / microsecond^2
118 uptime: Duration,
119) -> Stream<G, D>
120where
121 D: timely::Data + timely::ExchangeData,
122 F: FnMut(u128) -> D + 'static,
123 G: Scope<Timestamp = u128>,
124{
125 variable_rate_source(
126 item_gen,
127 scope,
128 move |elapsed| {
129 // Integral from 0 to T of (1 / f + a * t)
130 // = T / f + (a / 2) * T^2
131 let micros_elapsed = elapsed.as_micros() as f64;
132 time_util::div_durations(elapsed, frequency_init)
133 + (((acceleration / 2.0) * micros_elapsed * micros_elapsed)
134 as u128)
135 },
136 uptime,
137 )
138}