use crate::executor::Executor;
use crate::settings::pong_settings::{TaskGroupSettings, TaskType};
use crate::targets::TargetStatus;
use crate::task::http::http_executor::HttpExecutor;
use crate::task::icmp::icmp_executor::IcmpExecutor;
use crate::task::tcp::tcp_executor::TcpExecutor;
use log::{debug, error, info, trace};
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::mpsc::Sender;
use std::sync::Arc;
use tokio::time::{sleep, Instant};
#[derive(Clone)]
struct Task {
task_type: TaskType,
target: String,
target_status_tx: Sender<TargetStatus>,
executor: Arc<dyn Executor + Send + Sync>,
}
impl Debug for Task {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Task")
.field("task_type", &self.task_type)
.field("target", &self.target)
.field("executor", &self.executor.get_name())
.finish()
}
}
pub struct Scheduler {
target_status_tx: Sender<TargetStatus>,
}
impl Scheduler {
pub fn new(target_status_tx: Sender<TargetStatus>) -> Self {
Self { target_status_tx }
}
pub fn start(&self, task_groups: Vec<TaskGroupSettings>) {
debug!("启动任务调度器...");
for task_group in task_groups.into_iter() {
info!("添加任务组: {:?}", task_group);
let tasks: Arc<Vec<Task>> = Arc::new(
task_group
.tasks
.iter()
.map(|task| Task {
task_type: task.task_type.clone(),
target: task.target.clone(),
target_status_tx: self.target_status_tx.clone(),
executor: match task.task_type {
TaskType::ICMP => Arc::new(IcmpExecutor::new(
task.target.clone(),
task_group.timeout.unwrap(),
)),
TaskType::TCP => Arc::new(TcpExecutor::new(
task.target.clone(),
task_group.timeout.unwrap(),
)),
TaskType::HTTP => Arc::new(HttpExecutor::new(
task.target.clone(),
task_group.timeout.unwrap(),
)),
},
})
.collect(),
);
let duration = task_group.interval.unwrap();
let tasks_clone = Arc::clone(&tasks); tokio::spawn(async move {
loop {
for task in tasks_clone.iter() {
Self::exec_task(task.clone()).await;
}
sleep(duration).await;
}
});
}
}
async fn exec_task(task: Task) {
let start_time = Instant::now();
let task_desc = format!("{:?}", task);
let executor_name = task.executor.get_name().clone();
trace!("开始执行任务: {}:{}", executor_name, task_desc);
let elapsed = match task.executor.exec().await {
Ok(_) => {
let elapsed = start_time.elapsed().as_millis() as i64;
info!(
"Ping {} --> {} --> Pong in {} ms",
executor_name, task.target, elapsed
);
elapsed
}
Err(e) => {
error!(
"Ping {} --> {} --> Failed {}",
executor_name, task.target, e
);
-1
}
};
let target_status = TargetStatus {
task_type: task.task_type.clone(),
target: task.target.clone(),
elapsed,
};
trace!("更新目标状态: {:?}", target_status);
task.target_status_tx.send(target_status).unwrap();
}
}