1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use crate::common::task::TaskFlag;
use chrono::Datelike;
use chrono::Timelike;
use std::time::{Duration, SystemTime};
use tokio::time;
use tracing::{error, info};
use crate::global::{get_all, start};
pub async fn rerun_tasks(delay: u64) -> Result<(), Box<dyn std::error::Error>> {
let tasks = get_all().await?;
let now = chrono::Local::now();
for (id, task) in tasks {
match task.task_type {
crate::common::task::TaskType::Scheduled(scheduled) => {
let nd = chrono::NaiveDate::from_ymd_opt(
scheduled.year.unwrap_or(now.year()),
scheduled.month.unwrap_or(now.month()),
scheduled.day.unwrap_or(now.day()),
);
let nt = chrono::NaiveTime::from_hms_opt(
scheduled.hour.unwrap_or(now.hour()),
scheduled.minute.unwrap_or(now.minute()),
scheduled.second.unwrap_or(now.second()),
);
match (nd, nt) {
(Some(nd), Some(nt)) => {
let exec = chrono::NaiveDateTime::new(nd, nt);
let exec_timestamp_utc = exec.timestamp();
let now_timestamp_utc = now.naive_local().timestamp();
if ((exec_timestamp_utc - now_timestamp_utc).abs() as u64) < delay
&& exec_timestamp_utc <= now_timestamp_utc
{
if let Some(status) = task.status {
if status == "waiting" {
start(TaskFlag {
id,
name: None,
group: None,
mat: false,
})
.await?;
}
}
}
}
_ => {
error!("Invalid scheduled task: {}", id);
}
}
}
crate::common::task::TaskType::Async(_) => {
if let Some(status) = task.status {
if status == "auto restart" {
info!("Restart task: {}", id);
start(TaskFlag {
id,
name: None,
group: None,
mat: false,
})
.await?;
}
}
}
crate::common::task::TaskType::Periodic(tt) => {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get timestamp")
.as_secs();
if now >= tt.started_after && now - tt.last_run >= tt.interval {
if let Some(status) = task.status {
if tt.sync {
if status == "interval" || status == "executing" {
info!("Execute periodic task: {}", id);
start(TaskFlag {
id,
name: None,
group: None,
mat: false,
})
.await?;
}
} else {
if status == "interval" {
info!("Execute periodic task: {}", id);
start(TaskFlag {
id,
name: None,
group: None,
mat: false,
})
.await?;
}
}
}
}
}
crate::common::task::TaskType::None => {}
}
}
Ok(())
}
pub async fn run_monitor(delay: Option<u64>) -> Result<(), Box<dyn std::error::Error>> {
let delay = delay.unwrap_or(5);
let mut interval = time::interval(Duration::from_secs(delay));
loop {
match rerun_tasks(delay).await {
Ok(_) => {}
Err(e) => {
error!("Monitor tasks error: {}", e);
}
}
interval.tick().await;
}
}