1use futures::stream::FuturesUnordered;
2use futures::StreamExt;
3use pbr::MultiBar;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::process::Output;
8use tokio::process::Command;
9use tokio::task::spawn_blocking;
10
11use crate::core::*;
12use crate::error::Error;
13
14const DEFAULT_RETRY_LIMIT: i32 = 5;
15
16#[derive(Debug, Serialize, Deserialize, Clone)]
20pub enum ConfigFormat {
21 Ron,
22 Json,
23 Toml,
24 Yaml,
25}
26
27#[derive(Debug, Clone)]
28pub struct Executor<T: Default + BuildParam<P>, P: BuildCmd> {
29 config_path: String,
30 config_format: ConfigFormat,
31 ns3_path: String,
32 task_concurrent: usize,
33 retry_limit: u32,
34 pub configs: HashMap<String, T>,
35 pub outputs: HashMap<String, Vec<Task<P>>>,
36}
37
38#[derive(Debug, Clone)]
39pub struct ExecutorBuilder {
40 pub config_path: Option<String>,
41 pub config_format: Option<ConfigFormat>,
42 pub ns3_path: Option<String>,
43 pub task_concurrent: Option<usize>,
44 pub retry_limit: Option<u32>,
45}
46
47#[derive(Debug, Clone)]
48pub struct Task<P: BuildCmd> {
49 pub param: P,
50 pub output: Output,
51 pub stdout: String,
52 pub stderr: String,
53}
54
55impl<T: Default + BuildParam<P>, P: BuildCmd> Executor<T, P> {
56 pub fn get_config_path(&self) -> &str {
57 &self.config_path
58 }
59
60 pub fn get_config_format(&self) -> &ConfigFormat {
61 &self.config_format
62 }
63
64 pub fn get_ns3_path(&self) -> &str {
65 &self.ns3_path
66 }
67
68 pub fn get_task_concurrent(&self) -> usize {
69 self.task_concurrent
70 }
71
72 pub fn get_retry_limit(&self) -> u32 {
73 self.retry_limit
74 }
75
76 pub fn get_configs(&self) -> &HashMap<String, T> {
77 &self.configs
78 }
79
80 pub fn get_outputs(&self) -> &HashMap<String, Vec<Task<P>>> {
81 &self.outputs
82 }
83
84 pub async fn execute(&mut self) -> Result<(), Error> {
85 let ns3_dir = Path::new(&self.ns3_path);
86 println!("========== Build NS3 Program ==========");
87 build_ns3_program(ns3_dir).await?;
88 println!("Build NS3 Successfully!");
89 println!("========== Execute NS3 Tasks ==========");
90 let mut tasks = FuturesUnordered::new();
91 let mut params_map: HashMap<&String, Vec<P>> = self
92 .configs
93 .iter()
94 .map(|(k, v)| (k, v.build_param()))
95 .collect();
96 let total_count = params_map.iter().map(|(_, v)| v.len()).sum::<usize>() as u64;
97 let mb = MultiBar::new();
98 mb.println("Launch NS3 Tasks: ");
99 let mut pb1 = mb.create_bar(total_count);
100 mb.println("Complete NS3 Tasks: ");
101 let mut pb2 = mb.create_bar(total_count);
102 let progress = spawn_blocking(move || {
103 mb.listen();
104 });
105 for params in params_map.drain() {
106 for param in params.1 {
107 pb1.inc();
108 tasks.push(execute_ns3_program(
109 params.0,
110 ns3_dir,
111 param,
112 self.retry_limit,
113 ));
114 if tasks.len() >= self.task_concurrent {
116 if let Some(t) = tasks.next().await {
117 let (n, t) = t?;
118 pb2.inc();
119 if let Some(v) = self.outputs.get_mut(n) {
120 v.push(t);
121 }
122 }
123 }
124 }
125 }
126 pb1.finish();
127 while let Some(t) = tasks.next().await {
129 let (n, t) = t?;
131 pb2.inc();
132 if let Some(v) = self.outputs.get_mut(n) {
133 v.push(t);
134 }
135 }
136 pb2.finish();
137 progress.await.unwrap();
138 Ok(())
139 }
140}
141
142fn check_config_file(config_path: &String, ext: &ConfigFormat) -> Result<PathBuf, Error> {
143 let config_file_path = match Path::new(&config_path).canonicalize() {
144 Ok(path) => path,
145 Err(e) => {
146 return Err(Error::FileNotFound(format!(
147 "Can not locate config file: {:?}.",
148 e
149 )));
150 }
151 };
152 match config_file_path.extension() {
153 Some(t) => match ext {
154 ConfigFormat::Ron => {
155 if t != "ron" {
156 return Err(Error::InvalidConfig(
157 "Config file must be a ron file.".to_string(),
158 ));
159 }
160 }
161 ConfigFormat::Json => {
162 if t != "json" {
163 return Err(Error::InvalidConfig(
164 "Config file must be a json file.".to_string(),
165 ));
166 }
167 }
168 ConfigFormat::Toml => {
169 if t != "toml" {
170 return Err(Error::InvalidConfig(
171 "Config file must be a toml file.".to_string(),
172 ));
173 }
174 }
175 ConfigFormat::Yaml => {
176 if t != "yaml" {
177 return Err(Error::InvalidConfig(
178 "Config file must be a yaml file.".to_string(),
179 ));
180 }
181 }
182 },
183 None => {
184 return Err(Error::InvalidConfig(
185 "Config file must have a valid file extension.".to_string(),
186 ));
187 }
188 }
189 Ok(config_file_path)
190}
191
192impl Default for ExecutorBuilder {
193 fn default() -> Self {
194 Self::new()
195 }
196}
197
198impl ExecutorBuilder {
199 pub fn new() -> Self {
200 Self {
201 config_path: None,
202 config_format: None,
203 ns3_path: None,
204 task_concurrent: None,
205 retry_limit: None,
206 }
207 }
208
209 pub fn config_path(mut self, config_path: &str) -> Self {
210 self.config_path = Some(config_path.to_string());
211 self
212 }
213
214 pub fn config_format(mut self, config_format: ConfigFormat) -> Self {
215 self.config_format = Some(config_format);
216 self
217 }
218
219 pub fn ns3_path(mut self, ns3_path: &str) -> Self {
220 self.ns3_path = Some(ns3_path.to_string());
221 self
222 }
223
224 pub fn task_concurrent(mut self, task_concurrent: usize) -> Self {
225 self.task_concurrent = Some(task_concurrent);
226 self
227 }
228
229 pub fn retry_limit(mut self, retry_limit: u32) -> Self {
230 self.retry_limit = Some(retry_limit);
231 self
232 }
233
234 pub fn build<T: Default + BuildParam<P> + serde::de::DeserializeOwned, P: BuildCmd>(
235 self,
236 ) -> Result<Executor<T, P>, Error> {
237 let config_format = self.config_format.unwrap_or(ConfigFormat::Toml);
238 let mut config_path = self.config_path.unwrap_or_else(|| match &config_format {
239 ConfigFormat::Ron => "config.ron".to_string(),
240 ConfigFormat::Toml => "config.toml".to_string(),
241 ConfigFormat::Json => "config.json".to_string(),
242 ConfigFormat::Yaml => "config.yaml".to_string(),
243 });
244 let mut ns3_path = self.ns3_path.unwrap_or_else(|| "/".to_string());
245 let task_concurrent = self.task_concurrent.unwrap_or_else(num_cpus::get);
246 let retry_limit = self.retry_limit.unwrap_or(DEFAULT_RETRY_LIMIT as u32);
247 let config_file_path = check_config_file(&config_path, &config_format)?;
249 config_path = config_file_path.display().to_string();
250 let ns3_dir_path = match Path::new(&ns3_path).join("waf").canonicalize() {
252 Ok(path) => path,
253 Err(e) => {
254 return Err(Error::FileNotFound(format!(
255 "Can not locate ns3 dir: {:?}.",
256 e
257 )));
258 }
259 };
260 ns3_path = ns3_dir_path.parent().unwrap().display().to_string();
261 let configs: HashMap<String, T> = match config_format {
262 ConfigFormat::Ron => {
263 let f = std::fs::File::open(config_file_path)?;
264 ron::de::from_reader(f)?
265 }
266 ConfigFormat::Json => {
267 let f = std::fs::File::open(config_file_path)?;
268 serde_json::from_reader(f)?
269 }
270 ConfigFormat::Yaml => {
271 let f = std::fs::File::open(config_file_path)?;
272 serde_yaml::from_reader(f)?
273 }
274 ConfigFormat::Toml => {
275 let configuration = std::fs::read_to_string(config_file_path)?;
276 let configs: toml::value::Table = match toml::from_str(&configuration) {
277 Ok(t) => t,
278 Err(e) => {
279 return Err(Error::InvalidConfigFormat(format!(
280 "Config file is not a valid toml file. Err: {:?}.",
281 e
282 )));
283 }
284 };
285 configs
286 .iter()
287 .map(|(k, v)| (k.to_owned(), v.to_owned().try_into().unwrap()))
288 .collect()
289 }
290 };
291 let outputs: HashMap<String, Vec<Task<P>>> = configs
292 .iter()
293 .map(|(k, _)| (k.to_owned(), vec![]))
294 .collect();
295
296 Ok(Executor {
297 config_path,
298 config_format,
299 ns3_path,
300 task_concurrent,
301 retry_limit,
302 configs,
303 outputs,
304 })
305 }
306}
307
308impl<P: BuildCmd> Task<P> {
309 pub fn read_raw(&self) -> (Vec<u8>, Vec<u8>) {
310 let stdout = self.output.stdout.clone();
311 let stderr = self.output.stderr.clone();
312 (stdout, stderr)
313 }
314
315 pub fn read_raw_stdout(&self) -> Vec<u8> {
316 self.output.stdout.clone()
317 }
318
319 pub fn read_raw_stderr(&self) -> Vec<u8> {
320 self.output.stderr.clone()
321 }
322
323 pub fn read(&self) -> (&str, &str) {
324 (&self.stdout, &self.stderr)
325 }
326
327 pub fn read_stdout(&self) -> &str {
328 &self.stdout
329 }
330
331 pub fn read_stderr(&self) -> &str {
332 &self.stderr
333 }
334}
335
336async fn execute_ns3_program<P: BuildCmd>(
337 name: &str,
338 ns3_dir: impl AsRef<Path>,
339 param: P,
340 retry_limit: u32,
341) -> Result<(&str, Task<P>), Error> {
342 let waf_path = ns3_dir.as_ref().join("waf");
343 let argument = param.build_cmd();
344 let mut cnt = 1;
345 let mut output = match Command::new(waf_path.as_os_str())
346 .arg("--run-no-build")
347 .arg(&argument)
348 .current_dir(&ns3_dir)
349 .output()
350 .await
351 {
352 Ok(output) => output,
353 Err(e) => {
354 return Err(Error::ExecuteFail(format!(
355 "Failed to execute NS3 program. Err: {:?}.",
356 e
357 )));
358 }
359 };
360 while !output.status.success() && cnt <= retry_limit {
361 cnt += 1;
362 if cnt > retry_limit {
363 return Err(Error::RetryLimitExceed);
364 }
365 output = match Command::new(waf_path.as_os_str())
366 .arg("--run-no-build")
367 .arg(&argument)
368 .current_dir(&ns3_dir)
369 .output()
370 .await
371 {
372 Ok(output) => output,
373 Err(e) => {
374 return Err(Error::ExecuteFail(format!(
375 "Failed to execute NS3 program. Err: {:?}.",
376 e
377 )));
378 }
379 };
380 }
381 let stdout = String::from_utf8(output.stdout.clone()).unwrap();
382 let stderr = String::from_utf8(output.stderr.clone()).unwrap();
383 Ok((
384 name,
385 Task {
386 param,
387 output,
388 stdout,
389 stderr,
390 },
391 ))
392}
393
394async fn build_ns3_program(ns3_dir: impl AsRef<Path>) -> Result<(), Error> {
395 let waf_path = ns3_dir.as_ref().join("waf");
396 let output = match Command::new(waf_path.as_os_str())
397 .arg("build")
398 .current_dir(&ns3_dir)
399 .output()
400 .await
401 {
402 Ok(output) => output,
403 Err(e) => {
404 return Err(Error::ExecuteFail(format!(
405 "Failed to execute NS3 program. Err: {:?}.",
406 e
407 )));
408 }
409 };
410 if output.status.success() {
411 Ok(())
412 } else {
413 Err(Error::BuildFail(format!(
414 "Failed to build NS3 program. Err: \n{:?}.\n",
415 String::from_utf8(output.stderr).unwrap()
416 )))
417 }
418}