use super::{Duration, Scope, Stream, SystemTime};
use crate::util::time_util;
use std::cmp;
use timely::dataflow::operators::generic::operator::source;
const MAX_OUTPUT: u128 = 1000;
fn variable_rate_source<D, F, G, H>(
mut item_gen: F,
scope: &G,
cumulative_total_fun: H,
uptime: Duration,
) -> Stream<G, D>
where
D: timely::Data + timely::ExchangeData,
F: FnMut(u128) -> D + 'static, G: Scope<Timestamp = u128>,
H: Fn(Duration) -> u128 + 'static, {
source(scope, "Source", |capability, info| {
let activator = scope.activator_for(&info.address[..]);
let mut maybe_cap = Some(capability);
let start_time = SystemTime::now();
let mut vals_sent = 0;
let vals_max = cumulative_total_fun(uptime);
move |output| {
if let Some(cap) = maybe_cap.as_mut() {
let elapsed = time_util::time_since(start_time);
let vals_to_send = cmp::min(
cumulative_total_fun(elapsed),
vals_sent + MAX_OUTPUT,
);
let time_nanos = time_util::nanos_timestamp(SystemTime::now());
cap.downgrade(&time_nanos);
while vals_sent < vals_to_send && vals_sent < vals_max {
let item = item_gen(time_nanos);
output.session(&cap).give(item);
vals_sent += 1;
}
if vals_sent == vals_max {
maybe_cap = None;
} else {
activator.activate();
}
}
}
})
}
pub fn fixed_rate_source<D, F, G>(
item_gen: F,
scope: &G,
frequency: Duration,
uptime: Duration,
) -> Stream<G, D>
where
D: timely::Data + timely::ExchangeData,
F: FnMut(u128) -> D + 'static,
G: Scope<Timestamp = u128>,
{
variable_rate_source(
item_gen,
scope,
move |elapsed| time_util::div_durations(elapsed, frequency),
uptime,
)
}
pub fn linear_rate_source<D, F, G>(
item_gen: F,
scope: &G,
frequency_init: Duration,
acceleration: f64, uptime: Duration,
) -> Stream<G, D>
where
D: timely::Data + timely::ExchangeData,
F: FnMut(u128) -> D + 'static,
G: Scope<Timestamp = u128>,
{
variable_rate_source(
item_gen,
scope,
move |elapsed| {
let micros_elapsed = elapsed.as_micros() as f64;
time_util::div_durations(elapsed, frequency_init)
+ (((acceleration / 2.0) * micros_elapsed * micros_elapsed)
as u128)
},
uptime,
)
}