1use super::ec2;
6use super::operators::save_to_file;
7use super::perf::latency_throughput_meter;
8use super::{Scope, Stream};
9use crate::util::process_util::run_as_process;
10use crate::util::string_to_static_str;
11use crate::util::time_util::current_datetime_str;
12
13use abomonation_derive::Abomonation;
14use structopt::StructOpt;
15
16use std::str::FromStr;
17
18const RESULTS_DIR: &str = "results/";
22const RESULTS_EXT: &str = ".out";
23fn make_results_path<T: AsRef<str>>(
24 exp_name: &str,
25 args: &[T],
26) -> &'static str {
27 let mut out = RESULTS_DIR.to_owned() + ¤t_datetime_str() + "_";
28 out += exp_name;
29 for arg in args {
30 out += "_";
31 out += arg.as_ref();
32 }
33 out += RESULTS_EXT;
34 string_to_static_str(out)
35}
36
37const EC2_STARTING_PORT: u64 = 4000;
39const LOCAL_STARTING_PORT: u64 = 4000;
40const BARRIER_START_PORT: u16 = 5000;
41#[derive(Abomonation, Copy, Clone, Debug, Eq, PartialEq)]
45enum TimelyNetworkType {
46 SingleNode,
47 Local,
48 EC2,
49}
50impl FromStr for TimelyNetworkType {
51 type Err = &'static str;
52 fn from_str(input: &str) -> Result<TimelyNetworkType, Self::Err> {
53 match input {
54 "s" => Ok(Self::SingleNode),
55 "l" => Ok(Self::Local),
56 "e" => Ok(Self::EC2),
57 _ => Err("Invalid network type (choices: 's', 'l', 'e')"),
58 }
59 }
60}
61
62#[derive(Abomonation, Copy, Clone, Debug, Eq, PartialEq)]
69pub enum TimelyNodeInfo {
70 Local(u64), EC2,
72}
73impl FromStr for TimelyNodeInfo {
74 type Err = &'static str;
75 fn from_str(input: &str) -> Result<TimelyNodeInfo, Self::Err> {
76 if input == "e" {
77 Ok(Self::EC2)
78 } else if &input[0..1] == "l" {
79 match u64::from_str(&input[1..]) {
80 Ok(this_node) => Ok(Self::Local(this_node)),
81 Err(_err) => Err("Node ID should be a u64. Example usage: l3"),
82 }
83 } else {
84 Err("Invalid node info (choices: 'l<id>', 'e')")
85 }
86 }
87}
88impl TimelyNodeInfo {
89 fn this_node(&self) -> u64 {
90 match &self {
91 TimelyNodeInfo::Local(this_node) => *this_node,
92 TimelyNodeInfo::EC2 => ec2::get_ec2_node_number(),
93 }
94 }
95 fn is_main_node(&self) -> bool {
96 self.this_node() == 0
97 }
98}
99
100#[derive(Abomonation, Copy, Clone, Debug, StructOpt)]
105pub struct TimelyParallelism {
106 workers: u64,
108 nodes: u64,
110 this_node: u64,
112 #[structopt(default_value = "s")]
114 network: TimelyNetworkType,
115 #[structopt(skip = 0u64)]
120 experiment_num: u64,
121}
122impl TimelyParallelism {
123 fn new_single_node(workers: u64) -> TimelyParallelism {
125 let result = TimelyParallelism {
126 workers,
127 nodes: 1,
128 this_node: 0,
129 network: TimelyNetworkType::SingleNode,
130 experiment_num: 0,
131 };
132 result.validate();
133 result
134 }
135 pub fn new_sequential() -> TimelyParallelism {
136 Self::new_single_node(1)
137 }
138 pub fn new_distributed_local(
139 workers: u64,
140 nodes: u64,
141 this_node: u64,
142 experiment_num: u64,
143 ) -> TimelyParallelism {
144 let result = TimelyParallelism {
145 workers,
146 nodes,
147 this_node,
148 network: TimelyNetworkType::Local,
149 experiment_num,
150 };
151 result.validate();
152 result
153 }
154 pub fn new_distributed_ec2(
155 workers: u64,
156 nodes: u64,
157 experiment_num: u64,
158 ) -> TimelyParallelism {
159 let result = TimelyParallelism {
160 workers,
161 nodes,
162 this_node: ec2::get_ec2_node_number(),
163 network: TimelyNetworkType::EC2,
164 experiment_num,
165 };
166 result.validate();
167 result
168 }
169 pub fn new_from_info(
170 node_info: TimelyNodeInfo,
171 workers: u64,
172 nodes: u64,
173 experiment_num: u64,
174 ) -> TimelyParallelism {
175 match node_info {
176 TimelyNodeInfo::Local(this_node) => Self::new_distributed_local(
177 workers,
178 nodes,
179 this_node,
180 experiment_num,
181 ),
182 TimelyNodeInfo::EC2 => {
183 Self::new_distributed_ec2(workers, nodes, experiment_num)
184 }
185 }
186 }
187
188 fn validate(&self) {
190 assert!(self.workers >= 1 && self.nodes >= 1);
191 if self.network == TimelyNetworkType::SingleNode {
192 assert!(self.nodes == 1);
193 }
194 }
195 fn is_participating(&self) -> bool {
196 self.this_node < self.nodes
197 }
198 fn prepare_host_file(&self) -> &'static str {
199 match self.network {
200 TimelyNetworkType::SingleNode => unreachable!(),
201 TimelyNetworkType::Local => {
202 let port = LOCAL_STARTING_PORT + self.experiment_num;
203 ec2::prepare_local_host_file(port)
204 }
205 TimelyNetworkType::EC2 => {
206 let port = EC2_STARTING_PORT + self.experiment_num * self.nodes;
207 ec2::prepare_ec2_host_file(port)
208 }
209 }
210 }
211
212 pub fn to_csv(self) -> String {
214 self.validate();
215 format!("{} wkrs, {} nodes", self.workers, self.nodes)
216 }
217 pub fn to_vec(self) -> Vec<String> {
218 self.validate();
219 vec![self.workers.to_string(), self.nodes.to_string()]
220 }
221
222 pub fn timely_args(&self) -> Option<Vec<String>> {
228 self.validate();
229 if !self.is_participating() {
230 None
231 } else {
232 let mut vec = vec!["-w".to_string(), self.workers.to_string()];
233 if self.nodes > 1 {
234 vec.push("-n".to_string());
235 vec.push(self.nodes.to_string());
236 vec.push("-p".to_string());
237 vec.push(self.this_node.to_string());
238
239 let hostfile = self.prepare_host_file();
240 vec.push("-h".to_string());
241 vec.push(hostfile.to_string());
242 }
243 Some(vec)
244 }
245 }
246
247 fn is_main_node(&self) -> bool {
249 self.this_node == 0
250 }
251 fn barrier(&self) {
253 match self.network {
254 TimelyNetworkType::SingleNode => (),
255 TimelyNetworkType::Local => {
256 let port = BARRIER_START_PORT
257 + ((self.experiment_num * 2 * self.nodes) as u16);
258 ec2::local_barrier(self.nodes, self.this_node, port);
259 }
260 TimelyNetworkType::EC2 => {
261 let port = BARRIER_START_PORT
262 + ((self.experiment_num * 2 * self.nodes) as u16);
263 ec2::ec2_barrier(self.nodes, self.this_node, port);
264 }
265 }
266 }
267}
268
269pub trait ExperimentParams: Copy + StructOpt + timely::ExchangeData {
282 fn to_csv(self) -> String;
283 fn to_vec(self) -> Vec<String>;
284 fn get_exp_duration_secs(&self) -> u64;
285 fn set_rate(self, rate_per_milli: u64) -> Self;
286}
287
288pub trait LatencyThroughputExperiment<P, I, O>: timely::ExchangeData
300where
301 P: ExperimentParams,
302 I: std::fmt::Debug + Clone + timely::Data,
303 O: std::fmt::Debug + Clone + timely::Data,
304{
305 fn get_name(&self) -> String;
307 fn build_dataflow<G: Scope<Timestamp = u128>>(
308 &self,
309 params: P,
310 scope: &mut G,
311 worker_index: usize,
312 ) -> (Stream<G, I>, Stream<G, O>);
313
314 fn run_dataflow<G: Scope<Timestamp = u128>>(
317 &self,
318 scope: &mut G,
319 params: P,
320 parallelism: TimelyParallelism,
321 worker_index: usize,
322 output_filename: &'static str,
323 ) {
324 let (input, output) = self.build_dataflow(params, scope, worker_index);
325 let latency_throughput = latency_throughput_meter(&input, &output);
331 let parallelism_csv = parallelism.to_csv();
332 let params_csv = params.to_csv();
333 save_to_file(
334 &latency_throughput,
335 output_filename,
336 move |(latency, throughput)| {
337 format!(
338 "{}, {}, {} ms, {} events/ms",
339 parallelism_csv, params_csv, latency, throughput
340 )
341 },
342 );
343 }
344 fn run_with_filename(
347 &'static self,
348 params: P,
349 parallelism: TimelyParallelism,
350 output_filename: &'static str,
351 ) {
352 if parallelism.is_main_node() {
353 println!(
354 "{} Experiment Parameters: {}, Parallelism: {}",
355 self.get_name(),
356 params.to_csv(),
357 parallelism.to_csv(),
358 );
359 }
360 let opt_args = parallelism.timely_args();
361 let node_index = parallelism.this_node;
362 match opt_args {
363 Some(mut args) => {
364 parallelism.barrier();
366
367 println!("[node {}] initializing experiment", node_index);
368 println!("[node {}] timely args: {:?}", node_index, args);
369 let func = move || {
370 timely::execute_from_args(args.drain(0..), move |worker| {
371 let worker_index = worker.index();
372 worker.dataflow(move |scope| {
373 self.run_dataflow(
374 scope,
375 params,
376 parallelism,
377 worker_index,
378 output_filename,
379 );
380 println!(
381 "[worker {}] setup complete",
382 worker_index
383 );
384 });
385 })
386 .unwrap();
387 };
388 run_as_process(func);
389 }
390 None => {
391 println!(
392 "[node {}] skipping experiment between nodes {:?}",
393 node_index,
394 (0..parallelism.nodes).collect::<Vec<u64>>()
395 );
396 }
402 }
403 }
404
405 fn run_single(&'static self, params: P, parallelism: TimelyParallelism) {
408 let mut args = Vec::new();
409 args.append(&mut params.to_vec());
410 args.append(&mut parallelism.to_vec());
411 let results_path = make_results_path(&self.get_name(), &args[..]);
412 self.run_with_filename(params, parallelism, results_path);
413 }
414 fn run_all(
416 &'static self,
417 node_info: TimelyNodeInfo,
418 default_params: P,
419 rates_per_milli: &[u64],
420 par_workers: &[u64],
421 par_nodes: &[u64],
422 ) {
423 let mut exp_num = 0;
425 for &par_w in par_workers {
426 for &par_n in par_nodes {
427 if node_info.is_main_node() {
428 println!(
429 "===== Parallelism: {} w, {} n =====",
430 par_w, par_n
431 );
432 }
433 let results_path = make_results_path(
434 &self.get_name(),
435 &[
436 &("w".to_owned() + &par_w.to_string()),
437 &("n".to_owned() + &par_n.to_string()),
438 ],
439 );
440 for &rate in rates_per_milli {
441 if node_info.is_main_node() {
442 println!("=== Input Rate (events/ms): {} ===", rate);
443 }
444 let params = default_params.set_rate(rate);
445 let parallelism = TimelyParallelism::new_from_info(
446 node_info, par_w, par_n, exp_num,
447 );
448 self.run_with_filename(params, parallelism, results_path);
449 exp_num += 1;
450 }
451 }
452 }
453 }
454}