timely_util/
experiment.rs

1/*
2    Abstractions for running experiments in Timely.
3*/
4
5use super::ec2;
6use super::operators::save_to_file;
7use super::perf::latency_throughput_meter;
8use super::{Scope, Stream};
9use crate::util::process_util::run_as_process;
10use crate::util::string_to_static_str;
11use crate::util::time_util::current_datetime_str;
12
13use abomonation_derive::Abomonation;
14use structopt::StructOpt;
15
16use std::str::FromStr;
17
18/* Constants */
19
20// Results filenames
21const RESULTS_DIR: &str = "results/";
22const RESULTS_EXT: &str = ".out";
23fn make_results_path<T: AsRef<str>>(
24    exp_name: &str,
25    args: &[T],
26) -> &'static str {
27    let mut out = RESULTS_DIR.to_owned() + &current_datetime_str() + "_";
28    out += exp_name;
29    for arg in args {
30        out += "_";
31        out += arg.as_ref();
32    }
33    out += RESULTS_EXT;
34    string_to_static_str(out)
35}
36
37// Ports for distributed communication over EC2
38const EC2_STARTING_PORT: u64 = 4000;
39const LOCAL_STARTING_PORT: u64 = 4000;
40const BARRIER_START_PORT: u16 = 5000;
41/*
42    Types of networks where Timely distributed experiments can be run
43*/
44#[derive(Abomonation, Copy, Clone, Debug, Eq, PartialEq)]
45enum TimelyNetworkType {
46    SingleNode,
47    Local,
48    EC2,
49}
50impl FromStr for TimelyNetworkType {
51    type Err = &'static str;
52    fn from_str(input: &str) -> Result<TimelyNetworkType, Self::Err> {
53        match input {
54            "s" => Ok(Self::SingleNode),
55            "l" => Ok(Self::Local),
56            "e" => Ok(Self::EC2),
57            _ => Err("Invalid network type (choices: 's', 'l', 'e')"),
58        }
59    }
60}
61
62/*
63    Distributed node information
64    Network together with a node number.
65    Used to run distributed experiments that can be either local
66    (with a node number) or over EC2 (where node number will be derived).
67*/
68#[derive(Abomonation, Copy, Clone, Debug, Eq, PartialEq)]
69pub enum TimelyNodeInfo {
70    Local(u64), // node number
71    EC2,
72}
73impl FromStr for TimelyNodeInfo {
74    type Err = &'static str;
75    fn from_str(input: &str) -> Result<TimelyNodeInfo, Self::Err> {
76        if input == "e" {
77            Ok(Self::EC2)
78        } else if &input[0..1] == "l" {
79            match u64::from_str(&input[1..]) {
80                Ok(this_node) => Ok(Self::Local(this_node)),
81                Err(_err) => Err("Node ID should be a u64. Example usage: l3"),
82            }
83        } else {
84            Err("Invalid node info (choices: 'l<id>', 'e')")
85        }
86    }
87}
88impl TimelyNodeInfo {
89    fn this_node(&self) -> u64 {
90        match &self {
91            TimelyNodeInfo::Local(this_node) => *this_node,
92            TimelyNodeInfo::EC2 => ec2::get_ec2_node_number(),
93        }
94    }
95    fn is_main_node(&self) -> bool {
96        self.this_node() == 0
97    }
98}
99
100/*
101    Parameters to run a Timely dataflow between several
102    parallel workers or nodes
103*/
104#[derive(Abomonation, Copy, Clone, Debug, StructOpt)]
105pub struct TimelyParallelism {
106    // Command line -w, should be >= 1
107    workers: u64,
108    // Command line -n, should be >= 1
109    nodes: u64,
110    // Command line -p, betwewen 0 and nodes-1 (unused if nodes=1)
111    this_node: u64,
112    // Type of network
113    #[structopt(default_value = "s")]
114    network: TimelyNetworkType,
115    // Experiment number -- to disambiguate unique experiments,
116    // in case multiple are going on at once so they don't interfere.
117    // This is incorporated into the port number. If not needed can just
118    // be set to 0.
119    #[structopt(skip = 0u64)]
120    experiment_num: u64,
121}
122impl TimelyParallelism {
123    /* Constructors */
124    fn new_single_node(workers: u64) -> TimelyParallelism {
125        let result = TimelyParallelism {
126            workers,
127            nodes: 1,
128            this_node: 0,
129            network: TimelyNetworkType::SingleNode,
130            experiment_num: 0,
131        };
132        result.validate();
133        result
134    }
135    pub fn new_sequential() -> TimelyParallelism {
136        Self::new_single_node(1)
137    }
138    pub fn new_distributed_local(
139        workers: u64,
140        nodes: u64,
141        this_node: u64,
142        experiment_num: u64,
143    ) -> TimelyParallelism {
144        let result = TimelyParallelism {
145            workers,
146            nodes,
147            this_node,
148            network: TimelyNetworkType::Local,
149            experiment_num,
150        };
151        result.validate();
152        result
153    }
154    pub fn new_distributed_ec2(
155        workers: u64,
156        nodes: u64,
157        experiment_num: u64,
158    ) -> TimelyParallelism {
159        let result = TimelyParallelism {
160            workers,
161            nodes,
162            this_node: ec2::get_ec2_node_number(),
163            network: TimelyNetworkType::EC2,
164            experiment_num,
165        };
166        result.validate();
167        result
168    }
169    pub fn new_from_info(
170        node_info: TimelyNodeInfo,
171        workers: u64,
172        nodes: u64,
173        experiment_num: u64,
174    ) -> TimelyParallelism {
175        match node_info {
176            TimelyNodeInfo::Local(this_node) => Self::new_distributed_local(
177                workers,
178                nodes,
179                this_node,
180                experiment_num,
181            ),
182            TimelyNodeInfo::EC2 => {
183                Self::new_distributed_ec2(workers, nodes, experiment_num)
184            }
185        }
186    }
187
188    /* Private methods */
189    fn validate(&self) {
190        assert!(self.workers >= 1 && self.nodes >= 1);
191        if self.network == TimelyNetworkType::SingleNode {
192            assert!(self.nodes == 1);
193        }
194    }
195    fn is_participating(&self) -> bool {
196        self.this_node < self.nodes
197    }
198    fn prepare_host_file(&self) -> &'static str {
199        match self.network {
200            TimelyNetworkType::SingleNode => unreachable!(),
201            TimelyNetworkType::Local => {
202                let port = LOCAL_STARTING_PORT + self.experiment_num;
203                ec2::prepare_local_host_file(port)
204            }
205            TimelyNetworkType::EC2 => {
206                let port = EC2_STARTING_PORT + self.experiment_num * self.nodes;
207                ec2::prepare_ec2_host_file(port)
208            }
209        }
210    }
211
212    /* Data summaries */
213    pub fn to_csv(self) -> String {
214        self.validate();
215        format!("{} wkrs, {} nodes", self.workers, self.nodes)
216    }
217    pub fn to_vec(self) -> Vec<String> {
218        self.validate();
219        vec![self.workers.to_string(), self.nodes.to_string()]
220    }
221
222    /* Compute arguments to pass to Timely */
223    // Note 1: call only once per experiment. Creates/initializes a host file
224    // specific to that experiment.
225    // Note 2: returns None if this node is not involved in this experiment
226    // (i.e. node # is larger than number of nodes)
227    pub fn timely_args(&self) -> Option<Vec<String>> {
228        self.validate();
229        if !self.is_participating() {
230            None
231        } else {
232            let mut vec = vec!["-w".to_string(), self.workers.to_string()];
233            if self.nodes > 1 {
234                vec.push("-n".to_string());
235                vec.push(self.nodes.to_string());
236                vec.push("-p".to_string());
237                vec.push(self.this_node.to_string());
238
239                let hostfile = self.prepare_host_file();
240                vec.push("-h".to_string());
241                vec.push(hostfile.to_string());
242            }
243            Some(vec)
244        }
245    }
246
247    /* Main node (used for restricting output so it is less noisy) */
248    fn is_main_node(&self) -> bool {
249        self.this_node == 0
250    }
251    /* Barrier (used for synchronizing experiments) */
252    fn barrier(&self) {
253        match self.network {
254            TimelyNetworkType::SingleNode => (),
255            TimelyNetworkType::Local => {
256                let port = BARRIER_START_PORT
257                    + ((self.experiment_num * 2 * self.nodes) as u16);
258                ec2::local_barrier(self.nodes, self.this_node, port);
259            }
260            TimelyNetworkType::EC2 => {
261                let port = BARRIER_START_PORT
262                    + ((self.experiment_num * 2 * self.nodes) as u16);
263                ec2::ec2_barrier(self.nodes, self.this_node, port);
264            }
265        }
266    }
267}
268
269/*
270    Trait to capture parameters that form the input to a Timely experiment.
271
272    to_csv should output the parameters separated by commas.
273    to_vec should output the parameters as strings in a list.
274    get_exp_duration_secs is the total time that the experiment runs (in secs).
275    set_rate should vary one or more of the parameters to set the
276    input throughput (in events / ms), which can be used to test throughput.
277
278    ExperimentParams should be immutable and implement the Copy trait.
279    For example, set_rate returns a new object.
280*/
281pub trait ExperimentParams: Copy + StructOpt + timely::ExchangeData {
282    fn to_csv(self) -> String;
283    fn to_vec(self) -> Vec<String>;
284    fn get_exp_duration_secs(&self) -> u64;
285    fn set_rate(self, rate_per_milli: u64) -> Self;
286}
287
288/*
289    Trait to capture the full executable experiment.
290
291    To use, implement the get_name and build_dataflow methods.
292
293    One reason this trait is needed is in order to hide the top-level scope
294    parameter passed by run_dataflow to build the dataflow, which is instead made
295    generic in the build_dataflow method. This
296    is necessary because Rust generics are weird -- see
297    https://stackoverflow.com/questions/37606035/pass-generic-function-as-argument
298*/
299pub trait LatencyThroughputExperiment<P, I, O>: timely::ExchangeData
300where
301    P: ExperimentParams,
302    I: std::fmt::Debug + Clone + timely::Data,
303    O: std::fmt::Debug + Clone + timely::Data,
304{
305    /* Functionality to implement */
306    fn get_name(&self) -> String;
307    fn build_dataflow<G: Scope<Timestamp = u128>>(
308        &self,
309        params: P,
310        scope: &mut G,
311        worker_index: usize,
312    ) -> (Stream<G, I>, Stream<G, O>);
313
314    /* Functionality provided, but mostly considered private */
315    // The core dataflow to be run
316    fn run_dataflow<G: Scope<Timestamp = u128>>(
317        &self,
318        scope: &mut G,
319        params: P,
320        parallelism: TimelyParallelism,
321        worker_index: usize,
322        output_filename: &'static str,
323    ) {
324        let (input, output) = self.build_dataflow(params, scope, worker_index);
325        // Optional other meters, uncomment for testing
326        // volume_meter(&input);
327        // completion_meter(&output);
328        // latency_meter(&output);
329        // throughput_meter(&input, &output);
330        let latency_throughput = latency_throughput_meter(&input, &output);
331        let parallelism_csv = parallelism.to_csv();
332        let params_csv = params.to_csv();
333        save_to_file(
334            &latency_throughput,
335            output_filename,
336            move |(latency, throughput)| {
337                format!(
338                    "{}, {}, {} ms, {} events/ms",
339                    parallelism_csv, params_csv, latency, throughput
340                )
341            },
342        );
343    }
344    // Run an experiment with a filename
345    // Only necessary if the user wants a custom filename
346    fn run_with_filename(
347        &'static self,
348        params: P,
349        parallelism: TimelyParallelism,
350        output_filename: &'static str,
351    ) {
352        if parallelism.is_main_node() {
353            println!(
354                "{} Experiment Parameters: {}, Parallelism: {}",
355                self.get_name(),
356                params.to_csv(),
357                parallelism.to_csv(),
358            );
359        }
360        let opt_args = parallelism.timely_args();
361        let node_index = parallelism.this_node;
362        match opt_args {
363            Some(mut args) => {
364                // Barrier to make sure different experiments don't overlap!
365                parallelism.barrier();
366
367                println!("[node {}] initializing experiment", node_index);
368                println!("[node {}] timely args: {:?}", node_index, args);
369                let func = move || {
370                    timely::execute_from_args(args.drain(0..), move |worker| {
371                        let worker_index = worker.index();
372                        worker.dataflow(move |scope| {
373                            self.run_dataflow(
374                                scope,
375                                params,
376                                parallelism,
377                                worker_index,
378                                output_filename,
379                            );
380                            println!(
381                                "[worker {}] setup complete",
382                                worker_index
383                            );
384                        });
385                    })
386                    .unwrap();
387                };
388                run_as_process(func);
389            }
390            None => {
391                println!(
392                    "[node {}] skipping experiment between nodes {:?}",
393                    node_index,
394                    (0..parallelism.nodes).collect::<Vec<u64>>()
395                );
396                // Old -- no longer necessary, it was just a hack not as general
397                // as using the barrier to synchronize experiments
398                // let sleep_dur = params.get_exp_duration_secs();
399                // println!("[node {}] sleeping for {}", node_index, sleep_dur);
400                // sleep_for_secs(sleep_dur);
401            }
402        }
403    }
404
405    /* Functionality provided and exposed as the main options */
406    // Run a single experiment.
407    fn run_single(&'static self, params: P, parallelism: TimelyParallelism) {
408        let mut args = Vec::new();
409        args.append(&mut params.to_vec());
410        args.append(&mut parallelism.to_vec());
411        let results_path = make_results_path(&self.get_name(), &args[..]);
412        self.run_with_filename(params, parallelism, results_path);
413    }
414    // Run many experiments
415    fn run_all(
416        &'static self,
417        node_info: TimelyNodeInfo,
418        default_params: P,
419        rates_per_milli: &[u64],
420        par_workers: &[u64],
421        par_nodes: &[u64],
422    ) {
423        // Run experiment for all different configurations
424        let mut exp_num = 0;
425        for &par_w in par_workers {
426            for &par_n in par_nodes {
427                if node_info.is_main_node() {
428                    println!(
429                        "===== Parallelism: {} w, {} n =====",
430                        par_w, par_n
431                    );
432                }
433                let results_path = make_results_path(
434                    &self.get_name(),
435                    &[
436                        &("w".to_owned() + &par_w.to_string()),
437                        &("n".to_owned() + &par_n.to_string()),
438                    ],
439                );
440                for &rate in rates_per_milli {
441                    if node_info.is_main_node() {
442                        println!("=== Input Rate (events/ms): {} ===", rate);
443                    }
444                    let params = default_params.set_rate(rate);
445                    let parallelism = TimelyParallelism::new_from_info(
446                        node_info, par_w, par_n, exp_num,
447                    );
448                    self.run_with_filename(params, parallelism, results_path);
449                    exp_num += 1;
450                }
451            }
452        }
453    }
454}