1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use crate::exec::FutureSig;
use crate::report::Report;
use crate::task::RunMode;
use crate::task::{Task, TaskGroup};
use crate::task_group::task_group;
use crate::task_item::task_item;
use chrono::Utc;
use futures::future::lazy;
use futures::stream::iter_ok;
use futures::sync::mpsc::Sender;
use futures::Future;
use futures::Sink;
use futures::Stream;
use std::sync::{Arc, Mutex};
pub fn task_seq(group: TaskGroup, sender: Sender<Report>) -> FutureSig {
let id_clone = group.id.clone();
let begin_clone = sender.clone();
Box::new(lazy(move || {
let track: Arc<Mutex<Vec<Result<Report, Report>>>> = Arc::new(Mutex::new(vec![]));
let c1 = track.clone();
let c2 = track.clone();
let items = group.items.into_iter().map(move |item| match item {
Task::Item(item) => task_item(item, sender.clone()),
Task::Group(group) => match group.run_mode {
RunMode::Series => task_seq(group, sender.clone()),
RunMode::Parallel => task_group(group, sender.clone()),
},
});
let begin_time = Utc::now();
begin_clone
.clone()
.send(Report::GroupStarted {
id: id_clone.clone(),
time: begin_time.clone(),
})
.then(move |_res: _| {
iter_ok(items)
.for_each(move |this_future| {
let results = c1.clone();
this_future
.then(move |x| {
let mut next = results.lock().unwrap();
match x {
Ok(Ok(s)) => {
next.push(Ok(s));
Ok(())
}
Ok(Err(s)) => {
next.push(Err(s));
Err(())
}
Err(e) => {
next.push(Err(e));
Err(())
}
}
})
.map(|_e| ())
})
.then(move |_res| {
let next = c2.clone();
let reports = next.lock().unwrap();
let all_valid = reports.iter().all(|x| x.is_ok());
if all_valid {
Ok(Ok(Report::EndGroup {
time: Utc::now(),
id: id_clone,
dur: Utc::now().signed_duration_since(begin_time),
reports: reports.clone(),
}))
} else {
Ok(Err(Report::ErrorGroup {
time: Utc::now(),
id: id_clone,
dur: Utc::now().signed_duration_since(begin_time),
reports: reports.clone(),
}))
}
})
})
}))
}