1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#![allow(unused_must_use)]
#![allow(dead_code)]

use std::sync::atomic::{AtomicU32,AtomicBool,Ordering};
use std::sync::Arc;
use std::borrow::Borrow;
use crossbeam_queue::SegQueue;
use async_trait::async_trait;

pub enum TaskPoolType{
    Default,
    Max(u32),
    Min(u32),
    Range(u32,u32),
    Calculate,
}

#[async_trait]
pub trait Task{
    async fn handle(&self);
}

pub struct TaskDefault{}
#[async_trait]
impl Task for TaskDefault{
    async fn handle(&self){}
}


pub struct TaskEntity{
    task_query:Arc<SegQueue<Arc<dyn Task + Send + Sync+'static>>>,
    max:u32,
    min:u32,
    now:Arc<AtomicU32>,
    worker_count:Arc<AtomicU32>,
    status:Arc<AtomicBool>,
    max_cache:usize,
}

impl TaskEntity
{
    pub fn new(t:TaskPoolType,cache:usize)->TaskEntity{
        let mut te = TaskEntity{
            task_query:Arc::new(SegQueue::new()),
            max:1<<31,
            min:1,
            now:Arc::new(AtomicU32::new(0)),
            worker_count:Arc::new(AtomicU32::new(0)),
            status:Arc::new(AtomicBool::new(true)),
            max_cache:cache,
        };
        match t {
            TaskPoolType::Default=>{
                return te;
            }
            TaskPoolType::Max(max)=>{
                te.max = max;
            }
            TaskPoolType::Min(min)=>{
                te.min = min;
            }
            TaskPoolType::Range(min,max)=>{
                te.max = max;
                te.min = min;
            }
            TaskPoolType::Calculate=>{
                let num = 8;//cpu_num().unwrap();
                te.max = num;
                te.min = num;
            }
        }
        te
    }
    pub fn init(&self){
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(self.init_task_pool());
    }
    pub async fn init_task_pool(&self){
        for _ in 0..self.min{
            self.make_worker();
        }
    }
    pub async fn worker(receiver:Arc<SegQueue<Arc<dyn Task + Send + Sync+'static>>>, now:Arc<AtomicU32>, status:Arc<AtomicBool>, max:u32, min:u32){
        if now.load(Ordering::Relaxed) >= max{
            return;
        }
        let s = now.fetch_add(1,Ordering::Relaxed);
        while status.load(Ordering::Relaxed) {
            let res = receiver.pop();
            if res.is_none() {
                if now.load(Ordering::Relaxed) > min{
                    break;
                }else{
                    tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
                }
            }
            if let Some(o) = res{
                o.handle().await;
                //println!("{}--->号工作者正在执行任务",s)
            }
        }
        now.fetch_sub(1,Ordering::Relaxed);
    }
    pub fn make_worker(&self){
        let rece = self.task_query.clone();
        let arc_now = Arc::clone(self.now.borrow());
        let status = self.status.clone();
        let tasks = Arc::clone(&self.worker_count);
        let max = self.max.clone();
        let min = self.min.clone();
        tokio::spawn(async move{
            Self::worker(rece,arc_now,status,max,min).await;
        });
    }
    /// 立即发送到执行队列,如果限定了max,则有可能造成堆积,导致内存暴增
    pub fn run(&self, f:Arc<dyn Task + Send + Sync+'static>) -> usize {
        self.task_query.push(f);
        let works  = self.now.load(Ordering::Relaxed);
        let now_tasks =  self.task_query.len() as u32;
        if works < self.max{
            if now_tasks > self.min &&  now_tasks < self.max{
                self.make_worker();
            }
        }
        return self.task_query.len();
    }
    /// 如果当前堆积任务过多超过预定闸值则缓解发送
    pub async fn run_ease(&self, f:Arc<dyn Task + Send + Sync+'static>){
        let now_tasks =  self.task_query.len() as u32;
        let works  = self.now.load(Ordering::Relaxed);
        if works < self.max{
            if now_tasks > self.min &&  now_tasks < self.max{
                self.make_worker();
            }
        }
        while self.task_query.len() > self.max_cache{
            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
        }
        self.task_query.push(f);
    }

    /// 关闭任务池并将已存在的任务执行完毕
    pub fn close(&self){
        self.status.store(false,Ordering::Relaxed);
        let mut len = self.task_query.len();
        while len > 0 {
            self.run(Arc::new(TaskDefault{}));
            len -= 1;
        }
    }
}