pub mod threads {
use log::*;
use tokio::{runtime,time};
use crossbeam::channel::{unbounded,Receiver,Sender};
use std::{ thread,sync::Mutex,sync::Once,time::Duration,
sync::atomic::AtomicI8, sync::atomic::Ordering,
sync::Arc };
use chrono::{Local};
use crate::schedule::schedule::{TaskAction};
use crate::errors::errors::{TError, TResult, TErrorKind};
use std::borrow::{BorrowMut, Borrow};
use std::ops::Add;
use crate::uuid::uuid::next_big_id;
use crate::parsers::parsers::parser_timestamp;
use std::sync::mpsc::channel;
use std::future::Future;
use tokio::task::JoinHandle;
pub struct TaskPool {
rt:tokio::runtime::Runtime,
stop_tx:Sender<u64>,
stop_rx:Receiver<u64>,
debug:bool,
}
impl TaskPool {
pub fn new(tick:Duration,count:i32) -> TaskPool {
let (tx,rx) =unbounded();
TaskPool {
rt: runtime::Builder::new_multi_thread()
.worker_threads(count as usize)
.enable_all()
.build()
.unwrap(),
stop_tx: tx,
stop_rx: rx,
debug:false,
}
}
pub fn rebuild(&mut self,count:i32,debug:bool) {
self.debug = debug;
self.rt = runtime::Builder::new_multi_thread()
.worker_threads(count as usize)
.enable_all()
.build()
.unwrap();
}
pub fn stop_task(&mut self,id:u64) -> TResult<()> {
match self.stop_tx.try_send(id) {
Err(e) => { Err(TError::new(TErrorKind::Other(e.to_string()))) }
Ok(_) => { Ok(()) }
}
}
pub fn block_on<F>(&self,future: F) -> F::Output
where
F: Future,
{
self.rt.block_on(future)
}
pub fn spawn_rt<F>(&self,future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.rt.spawn(future)
}
pub fn spawn(&self, t:Arc<dyn TaskAction>) {
let task = t.clone();
let rx_spwan = self.stop_rx.clone();
let debug = self.debug;
self.rt.spawn(async move {
let mut r_count = 0;
let max_count = task.loop_count();
loop {
if task.date_format().len() > 0 {
let now_time = Local::now().timestamp();
let next_tick = parser_timestamp(task.date_format()).unwrap();
if debug {
debug!("make next tick sec:{} id:{}",(next_tick - now_time) as u64,task.id());
}
time::sleep(time::Duration::from_secs( (next_tick - now_time) as u64 )).await;
}else {
if task.tick() <= 0 {
break }
if debug {
debug!("make next ticker sec:{} id:{}",(task.tick()) as u64,task.id());
}
time::sleep(time::Duration::from_millis( task.tick() )).await;
}
if max_count > 0 && r_count >= max_count {
if debug {
debug!("task finished:{}",task.id());
}
break; }
r_count+=1; if debug {
debug!("task run count:{} id:{}",r_count,task.id());
}
task.execute(task.id());
let sr = rx_spwan.try_recv();
match sr {
Err(e) => { },
Ok(val) => {
if task.id() == val {
debug!("task stopped:{}",task.id());
return;
}
},
}
}
});
}
}
}