use crate::{
config::read_config, cronjob::CronJob, job_builder::JobBuilder, logger, utils, web_server,
CronFilter, CronJobType,
};
use chrono::Duration;
use crossbeam_channel::{Receiver, Sender};
use rocket::Shutdown;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
thread::JoinHandle,
};
const GRACE_DEFAULT: u32 = 250;
pub struct CronFrame {
pub cron_jobs: Mutex<Vec<CronJob>>,
job_handles: Mutex<HashMap<String, JoinHandle<()>>>,
_logger: Option<log4rs::Handle>,
pub web_server_channels: (Sender<Shutdown>, Receiver<Shutdown>),
pub filter: Option<CronFilter>,
server_handle: Mutex<Option<Shutdown>>,
pub quit: Mutex<bool>,
pub grace: u32,
pub running: Mutex<bool>,
}
impl CronFrame {
pub fn default() -> Arc<CronFrame> {
CronFrame::init(None, true)
}
pub fn init(filter: Option<CronFilter>, use_logger: bool) -> Arc<CronFrame> {
println!("Starting CronFrame...");
let logger = if use_logger {
Some(logger::rolling_logger())
} else {
None
};
let frame = CronFrame {
cron_jobs: Mutex::new(vec![]),
job_handles: Mutex::new(HashMap::new()),
_logger: logger,
web_server_channels: crossbeam_channel::bounded(1),
filter,
server_handle: Mutex::new(None),
quit: Mutex::new(false),
grace: {
if let Some(config_data) = read_config() {
if let Some(scheduler_data) = config_data.scheduler {
scheduler_data.grace.unwrap_or_else(|| 250)
} else {
GRACE_DEFAULT
}
} else {
GRACE_DEFAULT
}
},
running: Mutex::new(false),
};
info!("CronFrame Init Start");
info!("Graceful Period {} ms", frame.grace);
info!("Colleting Global Jobs");
for job_builder in inventory::iter::<JobBuilder> {
let cron_job = job_builder.clone().build();
info!("Found Global Job \"{}\"", cron_job.name);
frame
.cron_jobs
.lock()
.expect("global job gathering error during init")
.push(cron_job)
}
info!("Global Jobs Collected");
info!("CronFrame Init Complete");
info!("CronFrame Server Init");
let frame = Arc::new(frame);
let server_frame = frame.clone();
let running = Mutex::new(false);
std::thread::spawn(move || web_server::web_server(server_frame));
*frame
.server_handle
.lock()
.expect("web server handle unwrap error") = match frame.web_server_channels.1.recv() {
Ok(handle) => {
*running.lock().unwrap() = true;
Some(handle)
}
Err(error) => {
error!("Web server shutdown handle error: {error}");
None
}
};
if *running.lock().unwrap() {
let (ip_address, port) = utils::ip_and_port();
info!(
"CronFrame Web Server running at http://{}:{}",
ip_address, port
);
println!("CronFrame running at http://{}:{}", ip_address, port);
}
frame
}
pub fn add_job(self: &Arc<CronFrame>, job: CronJob) -> Arc<CronFrame> {
self.cron_jobs
.lock()
.expect("add_job unwrap error on lock")
.push(job);
self.clone()
}
pub fn new_job(
self: Arc<CronFrame>,
name: &str,
job: fn(),
cron_expr: &str,
timeout: &str,
) -> Arc<CronFrame> {
self.add_job(JobBuilder::global_job(name, job, cron_expr, timeout).build())
}
pub fn start_scheduler<'a>(self: &Arc<Self>) -> Arc<Self> {
let cronframe = self.clone();
let ret = cronframe.clone();
if *self.running.lock().unwrap() {
return ret;
}
*cronframe
.running
.lock()
.expect("running unwrap error in quit start_scheduler method") = true;
*cronframe
.quit
.lock()
.expect("quit unwrap error in start_scheduler method") = false;
let scheduler = move || loop {
std::thread::sleep(Duration::milliseconds(500).to_std().unwrap());
if *cronframe
.quit
.lock()
.expect("quit unwrap error in scheduler")
{
break;
}
if !*cronframe
.running
.lock()
.expect("quit unwrap error in scheduler")
{
break;
}
let mut cron_jobs = cronframe
.cron_jobs
.lock()
.expect("cron jobs unwrap error in scheduler");
let mut jobs_to_remove: Vec<usize> = Vec::new();
for (i, cron_job) in &mut (*cron_jobs).iter_mut().enumerate() {
if let Some(filter) = &cronframe.filter {
let job_type = match cron_job.job {
CronJobType::Global(_) => CronFilter::Global,
CronJobType::Function(_) => CronFilter::Function,
CronJobType::Method(_) => CronFilter::Method,
CronJobType::CLI => CronFilter::CLI,
};
if job_type != *filter {
continue;
}
}
let job_id = format!("{} ID#{}", cron_job.name, cron_job.id);
let to_be_deleted = if let Some((_, life_rx)) = cron_job.life_channels.clone() {
match life_rx.try_recv() {
Ok(message) => {
if message == "JOB_DROP" {
info!("job @{} - Dropped", job_id);
jobs_to_remove.push(i);
true
} else {
false
}
}
Err(_error) => false,
}
} else {
false
};
let mut job_handlers = cronframe
.job_handles
.lock()
.expect("job handles unwrap error in scheduler");
cron_job.reset_timeout();
if !job_handlers.contains_key(&job_id) && !to_be_deleted {
if cron_job.suspended {
continue;
}
if cron_job.check_timeout() {
if !cron_job.timeout_notified {
info!("job @{} - Reached Timeout", job_id);
cron_job.timeout_notified = true;
}
continue;
}
let handle = (*cron_job).try_schedule(cronframe.grace);
if handle.is_some() {
job_handlers.insert(
job_id.clone(),
handle.expect("job handle unwrap error after try_schedule"),
);
info!(
"job @{} RUN_ID#{} - Scheduled",
job_id,
cron_job.run_id.as_ref().expect("run_id unwrap error")
);
}
}
else if let Some((_, status_rx)) = cron_job.status_channels.clone() {
match status_rx.try_recv() {
Ok(message) => {
if message == "JOB_COMPLETE" {
info!(
"job @{} RUN_ID#{} - Completed",
job_id,
cron_job.run_id.as_ref().unwrap()
);
job_handlers.remove(job_id.as_str());
cron_job.run_id = None;
} else if message == "JOB_ABORT" {
info!(
"job @{} RUN_ID#{} - Aborted",
job_id,
cron_job.run_id.as_ref().unwrap()
);
job_handlers.remove(job_id.as_str());
cron_job.run_id = None;
cron_job.failed = true;
}
}
Err(_error) => {}
}
}
}
if !jobs_to_remove.is_empty() {
let num_jobs = jobs_to_remove.len();
for i in 0..num_jobs {
cron_jobs.remove(jobs_to_remove[i]);
for j in i + 1..num_jobs {
jobs_to_remove[j] -= 1;
}
}
}
};
std::thread::spawn(scheduler);
info!("CronFrame Scheduler Running");
ret
}
pub fn keep_alive(self: &Arc<Self>) {
loop {
std::thread::sleep(Duration::milliseconds(500).to_std().unwrap());
if *self.quit.lock().unwrap() {
break;
}
}
}
pub fn run(self: &Arc<Self>) {
self.start_scheduler().keep_alive();
}
pub fn stop_scheduler(self: &Arc<Self>) {
info!("CronFrame Scheduler Shutdown");
*self.running.lock().unwrap() = false;
}
pub fn quit(self: &Arc<Self>) {
self.stop_scheduler();
info!("CronFrame Shutdown");
let cronframe = self.clone();
let handles = cronframe
.job_handles
.lock()
.expect("job handles unwrap error in stop scheduler method");
for handle in handles.iter() {
while !handle.1.is_finished() {
}
}
self.server_handle
.lock()
.expect("web server unwrap error in quit method")
.clone()
.expect("web server unwrap error after clone in quit method")
.notify();
*self
.quit
.lock()
.expect("quit unwrap error in stop scheduler method") = true;
}
}