1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use std::time::{Instant, Duration};
use std::sync::{Arc, RwLock};
use futures::Future;
use futures::future;
use futures::Stream;
use tokio;
use tokio_timer::Interval;
use opcua_types::service_types::ServerState as ServerStateType;
use crate::state::ServerState;
pub struct PollingAction {}
impl PollingAction {
pub fn spawn<F>(server_state: Arc<RwLock<ServerState>>, interval_ms: u64, action: F) -> PollingAction
where F: 'static + Fn() + Send
{
let server_state_take_while = server_state.clone();
let f = Interval::new(Instant::now(), Duration::from_millis(interval_ms))
.take_while(move |_| {
trace!("polling action.take_while");
let server_state = trace_read_lock_unwrap!(server_state_take_while);
let abort = match server_state.state() {
ServerStateType::Failed |
ServerStateType::NoConfiguration |
ServerStateType::Shutdown => {
true
}
_ => {
server_state.is_abort()
}
};
if abort {
debug!("Polling action is stopping due to server state / abort");
}
future::ok(!abort)
})
.for_each(move |_| {
let process_action = {
let server_state = trace_read_lock_unwrap!(server_state);
server_state.is_running()
};
if process_action {
action();
}
Ok(())
})
.map_err(|_| ());
let _ = tokio::spawn(f);
PollingAction {}
}
}