ns3_parallel/
executor.rs

1use futures::stream::FuturesUnordered;
2use futures::StreamExt;
3use pbr::MultiBar;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::process::Output;
8use tokio::process::Command;
9use tokio::task::spawn_blocking;
10
11use crate::core::*;
12use crate::error::Error;
13
14const DEFAULT_RETRY_LIMIT: i32 = 5;
15
16/// Used for ExecutorBuilder.
17///
18/// Specify the format of your config file. Default to `ConfigFormat::Toml`
19#[derive(Debug, Serialize, Deserialize, Clone)]
20pub enum ConfigFormat {
21    Ron,
22    Json,
23    Toml,
24    Yaml,
25}
26
27#[derive(Debug, Clone)]
28pub struct Executor<T: Default + BuildParam<P>, P: BuildCmd> {
29    config_path: String,
30    config_format: ConfigFormat,
31    ns3_path: String,
32    task_concurrent: usize,
33    retry_limit: u32,
34    pub configs: HashMap<String, T>,
35    pub outputs: HashMap<String, Vec<Task<P>>>,
36}
37
38#[derive(Debug, Clone)]
39pub struct ExecutorBuilder {
40    pub config_path: Option<String>,
41    pub config_format: Option<ConfigFormat>,
42    pub ns3_path: Option<String>,
43    pub task_concurrent: Option<usize>,
44    pub retry_limit: Option<u32>,
45}
46
47#[derive(Debug, Clone)]
48pub struct Task<P: BuildCmd> {
49    pub param: P,
50    pub output: Output,
51    pub stdout: String,
52    pub stderr: String,
53}
54
55impl<T: Default + BuildParam<P>, P: BuildCmd> Executor<T, P> {
56    pub fn get_config_path(&self) -> &str {
57        &self.config_path
58    }
59
60    pub fn get_config_format(&self) -> &ConfigFormat {
61        &self.config_format
62    }
63
64    pub fn get_ns3_path(&self) -> &str {
65        &self.ns3_path
66    }
67
68    pub fn get_task_concurrent(&self) -> usize {
69        self.task_concurrent
70    }
71
72    pub fn get_retry_limit(&self) -> u32 {
73        self.retry_limit
74    }
75
76    pub fn get_configs(&self) -> &HashMap<String, T> {
77        &self.configs
78    }
79
80    pub fn get_outputs(&self) -> &HashMap<String, Vec<Task<P>>> {
81        &self.outputs
82    }
83
84    pub async fn execute(&mut self) -> Result<(), Error> {
85        let ns3_dir = Path::new(&self.ns3_path);
86        println!("========== Build NS3 Program ==========");
87        build_ns3_program(ns3_dir).await?;
88        println!("Build NS3 Successfully!");
89        println!("========== Execute NS3 Tasks ==========");
90        let mut tasks = FuturesUnordered::new();
91        let mut params_map: HashMap<&String, Vec<P>> = self
92            .configs
93            .iter()
94            .map(|(k, v)| (k, v.build_param()))
95            .collect();
96        let total_count = params_map.iter().map(|(_, v)| v.len()).sum::<usize>() as u64;
97        let mb = MultiBar::new();
98        mb.println("Launch NS3 Tasks: ");
99        let mut pb1 = mb.create_bar(total_count);
100        mb.println("Complete NS3 Tasks: ");
101        let mut pb2 = mb.create_bar(total_count);
102        let progress = spawn_blocking(move || {
103            mb.listen();
104        });
105        for params in params_map.drain() {
106            for param in params.1 {
107                pb1.inc();
108                tasks.push(execute_ns3_program(
109                    params.0,
110                    ns3_dir,
111                    param,
112                    self.retry_limit,
113                ));
114                // If full, wait for one to finish.
115                if tasks.len() >= self.task_concurrent {
116                    if let Some(t) = tasks.next().await {
117                        let (n, t) = t?;
118                        pb2.inc();
119                        if let Some(v) = self.outputs.get_mut(n) {
120                            v.push(t);
121                        }
122                    }
123                }
124            }
125        }
126        pb1.finish();
127        // Wait for the remaining to finish.
128        while let Some(t) = tasks.next().await {
129            // handle response
130            let (n, t) = t?;
131            pb2.inc();
132            if let Some(v) = self.outputs.get_mut(n) {
133                v.push(t);
134            }
135        }
136        pb2.finish();
137        progress.await.unwrap();
138        Ok(())
139    }
140}
141
142fn check_config_file(config_path: &String, ext: &ConfigFormat) -> Result<PathBuf, Error> {
143    let config_file_path = match Path::new(&config_path).canonicalize() {
144        Ok(path) => path,
145        Err(e) => {
146            return Err(Error::FileNotFound(format!(
147                "Can not locate config file: {:?}.",
148                e
149            )));
150        }
151    };
152    match config_file_path.extension() {
153        Some(t) => match ext {
154            ConfigFormat::Ron => {
155                if t != "ron" {
156                    return Err(Error::InvalidConfig(
157                        "Config file must be a ron file.".to_string(),
158                    ));
159                }
160            }
161            ConfigFormat::Json => {
162                if t != "json" {
163                    return Err(Error::InvalidConfig(
164                        "Config file must be a json file.".to_string(),
165                    ));
166                }
167            }
168            ConfigFormat::Toml => {
169                if t != "toml" {
170                    return Err(Error::InvalidConfig(
171                        "Config file must be a toml file.".to_string(),
172                    ));
173                }
174            }
175            ConfigFormat::Yaml => {
176                if t != "yaml" {
177                    return Err(Error::InvalidConfig(
178                        "Config file must be a yaml file.".to_string(),
179                    ));
180                }
181            }
182        },
183        None => {
184            return Err(Error::InvalidConfig(
185                "Config file must have a valid file extension.".to_string(),
186            ));
187        }
188    }
189    Ok(config_file_path)
190}
191
192impl Default for ExecutorBuilder {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198impl ExecutorBuilder {
199    pub fn new() -> Self {
200        Self {
201            config_path: None,
202            config_format: None,
203            ns3_path: None,
204            task_concurrent: None,
205            retry_limit: None,
206        }
207    }
208
209    pub fn config_path(mut self, config_path: &str) -> Self {
210        self.config_path = Some(config_path.to_string());
211        self
212    }
213
214    pub fn config_format(mut self, config_format: ConfigFormat) -> Self {
215        self.config_format = Some(config_format);
216        self
217    }
218
219    pub fn ns3_path(mut self, ns3_path: &str) -> Self {
220        self.ns3_path = Some(ns3_path.to_string());
221        self
222    }
223
224    pub fn task_concurrent(mut self, task_concurrent: usize) -> Self {
225        self.task_concurrent = Some(task_concurrent);
226        self
227    }
228
229    pub fn retry_limit(mut self, retry_limit: u32) -> Self {
230        self.retry_limit = Some(retry_limit);
231        self
232    }
233
234    pub fn build<T: Default + BuildParam<P> + serde::de::DeserializeOwned, P: BuildCmd>(
235        self,
236    ) -> Result<Executor<T, P>, Error> {
237        let config_format = self.config_format.unwrap_or(ConfigFormat::Toml);
238        let mut config_path = self.config_path.unwrap_or_else(|| match &config_format {
239            ConfigFormat::Ron => "config.ron".to_string(),
240            ConfigFormat::Toml => "config.toml".to_string(),
241            ConfigFormat::Json => "config.json".to_string(),
242            ConfigFormat::Yaml => "config.yaml".to_string(),
243        });
244        let mut ns3_path = self.ns3_path.unwrap_or_else(|| "/".to_string());
245        let task_concurrent = self.task_concurrent.unwrap_or_else(num_cpus::get);
246        let retry_limit = self.retry_limit.unwrap_or(DEFAULT_RETRY_LIMIT as u32);
247        // Check config file
248        let config_file_path = check_config_file(&config_path, &config_format)?;
249        config_path = config_file_path.display().to_string();
250        // check ns3 directory
251        let ns3_dir_path = match Path::new(&ns3_path).join("waf").canonicalize() {
252            Ok(path) => path,
253            Err(e) => {
254                return Err(Error::FileNotFound(format!(
255                    "Can not locate ns3 dir: {:?}.",
256                    e
257                )));
258            }
259        };
260        ns3_path = ns3_dir_path.parent().unwrap().display().to_string();
261        let configs: HashMap<String, T> = match config_format {
262            ConfigFormat::Ron => {
263                let f = std::fs::File::open(config_file_path)?;
264                ron::de::from_reader(f)?
265            }
266            ConfigFormat::Json => {
267                let f = std::fs::File::open(config_file_path)?;
268                serde_json::from_reader(f)?
269            }
270            ConfigFormat::Yaml => {
271                let f = std::fs::File::open(config_file_path)?;
272                serde_yaml::from_reader(f)?
273            }
274            ConfigFormat::Toml => {
275                let configuration = std::fs::read_to_string(config_file_path)?;
276                let configs: toml::value::Table = match toml::from_str(&configuration) {
277                    Ok(t) => t,
278                    Err(e) => {
279                        return Err(Error::InvalidConfigFormat(format!(
280                            "Config file is not a valid toml file. Err: {:?}.",
281                            e
282                        )));
283                    }
284                };
285                configs
286                    .iter()
287                    .map(|(k, v)| (k.to_owned(), v.to_owned().try_into().unwrap()))
288                    .collect()
289            }
290        };
291        let outputs: HashMap<String, Vec<Task<P>>> = configs
292            .iter()
293            .map(|(k, _)| (k.to_owned(), vec![]))
294            .collect();
295
296        Ok(Executor {
297            config_path,
298            config_format,
299            ns3_path,
300            task_concurrent,
301            retry_limit,
302            configs,
303            outputs,
304        })
305    }
306}
307
308impl<P: BuildCmd> Task<P> {
309    pub fn read_raw(&self) -> (Vec<u8>, Vec<u8>) {
310        let stdout = self.output.stdout.clone();
311        let stderr = self.output.stderr.clone();
312        (stdout, stderr)
313    }
314
315    pub fn read_raw_stdout(&self) -> Vec<u8> {
316        self.output.stdout.clone()
317    }
318
319    pub fn read_raw_stderr(&self) -> Vec<u8> {
320        self.output.stderr.clone()
321    }
322
323    pub fn read(&self) -> (&str, &str) {
324        (&self.stdout, &self.stderr)
325    }
326
327    pub fn read_stdout(&self) -> &str {
328        &self.stdout
329    }
330
331    pub fn read_stderr(&self) -> &str {
332        &self.stderr
333    }
334}
335
336async fn execute_ns3_program<P: BuildCmd>(
337    name: &str,
338    ns3_dir: impl AsRef<Path>,
339    param: P,
340    retry_limit: u32,
341) -> Result<(&str, Task<P>), Error> {
342    let waf_path = ns3_dir.as_ref().join("waf");
343    let argument = param.build_cmd();
344    let mut cnt = 1;
345    let mut output = match Command::new(waf_path.as_os_str())
346        .arg("--run-no-build")
347        .arg(&argument)
348        .current_dir(&ns3_dir)
349        .output()
350        .await
351    {
352        Ok(output) => output,
353        Err(e) => {
354            return Err(Error::ExecuteFail(format!(
355                "Failed to execute NS3 program. Err: {:?}.",
356                e
357            )));
358        }
359    };
360    while !output.status.success() && cnt <= retry_limit {
361        cnt += 1;
362        if cnt > retry_limit {
363            return Err(Error::RetryLimitExceed);
364        }
365        output = match Command::new(waf_path.as_os_str())
366            .arg("--run-no-build")
367            .arg(&argument)
368            .current_dir(&ns3_dir)
369            .output()
370            .await
371        {
372            Ok(output) => output,
373            Err(e) => {
374                return Err(Error::ExecuteFail(format!(
375                    "Failed to execute NS3 program. Err: {:?}.",
376                    e
377                )));
378            }
379        };
380    }
381    let stdout = String::from_utf8(output.stdout.clone()).unwrap();
382    let stderr = String::from_utf8(output.stderr.clone()).unwrap();
383    Ok((
384        name,
385        Task {
386            param,
387            output,
388            stdout,
389            stderr,
390        },
391    ))
392}
393
394async fn build_ns3_program(ns3_dir: impl AsRef<Path>) -> Result<(), Error> {
395    let waf_path = ns3_dir.as_ref().join("waf");
396    let output = match Command::new(waf_path.as_os_str())
397        .arg("build")
398        .current_dir(&ns3_dir)
399        .output()
400        .await
401    {
402        Ok(output) => output,
403        Err(e) => {
404            return Err(Error::ExecuteFail(format!(
405                "Failed to execute NS3 program. Err: {:?}.",
406                e
407            )));
408        }
409    };
410    if output.status.success() {
411        Ok(())
412    } else {
413        Err(Error::BuildFail(format!(
414            "Failed to build NS3 program. Err: \n{:?}.\n",
415            String::from_utf8(output.stderr).unwrap()
416        )))
417    }
418}