1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::BalancingStrategy;
use tokio::sync::RwLock;
use std::ops::DerefMut;

enum Priority {
    DepthFirst,   //深度优先
    BreadthFirst, //广度优先
}

pub struct Polling<K>{
    priority:Priority,
    weight:AtomicUsize,
    index:AtomicUsize,
    list:RwLock<Vec<(K,usize)>>
}
impl<K:Clone + Eq + Send > Polling<K>{
    pub fn new()->Self{
        let priority = Priority::DepthFirst;
        let weight = AtomicUsize::new(0);
        let index = AtomicUsize::new(0);
        let list= RwLock::new(vec![]);
        Self{priority,weight,index,list}
    }
    pub fn depth_first(mut self) ->Self{
        self.priority = Priority::DepthFirst;self
    }
    pub fn breadth_first(mut self) ->Self{
        self.priority = Priority::BreadthFirst;self
    }
    async fn depth(&self) ->Option<K>{
        let mut list = self.list.write().await;
        let list = list.deref_mut();
        if list.len() <= 0 {
            return None
        }
        if let Some(node) = list.get(self.index.load(Ordering::Relaxed)){
            if self.weight.load(Ordering::Relaxed) < node.1 {
                self.weight.fetch_add(1,Ordering::Relaxed);
                return Some((&node.0).clone())
            }
        }
        self.index.fetch_add(1,Ordering::Relaxed);
        self.weight.store(1,Ordering::Relaxed);
        if self.index.load(Ordering::Relaxed) >= list.len() {
            self.index.store(0,Ordering::Relaxed);
        }
        return Some((&list[self.index.load(Ordering::Relaxed)].0).clone())
    }

    async fn breadth(&self) ->Option<K>{
        let mut list = self.list.write().await;
        let list = list.deref_mut();
        if list.len() <= 0 {
            return None
        }
        if self.index.load(Ordering::Relaxed) >= list.len() {
            self.index.store(0,Ordering::Relaxed);
            self.weight.fetch_add(1,Ordering::Relaxed);
            if self.weight.load(Ordering::Relaxed) >= list.last().unwrap().1 {
                self.weight.store(0,Ordering::Relaxed);
            }
        }
        let weight = self.weight.load(Ordering::Relaxed);
        for index in self.index.load(Ordering::Relaxed)..list.len() {
            if list[index].1 > weight {
                self.index.store(index+1,Ordering::Relaxed);
                return Some(list[index].0.clone())
            }
        }
        None
    }
}

#[async_trait::async_trait]
impl<K:Clone + Eq + Send + Sync > BalancingStrategy<K> for Polling<K>{
    async fn add(&self, k: K, n: usize) {
        let mut list = self.list.write().await;
        let list = list.deref_mut();
        match self.priority {
            Priority::DepthFirst => {
                list.push((k,n));
            }
            Priority::BreadthFirst => {
                for index in 0..list.len() {
                    if n <= list[index].1 {
                        list.insert(index,(k,n));
                        return;
                    }
                }
                list.push((k,n));
            }
        }

    }

    async fn remove(&self, k: K) {
        let mut list = self.list.write().await;
        let list = list.deref_mut();
        if list.len() < 1 {
            return
        }
        for index in 0..list.len(){
            if k == list[index].0 {
                list.remove(index);
                break;
            }
        }
    }

    async fn select(&self) -> Option<K> {
        match self.priority {
            Priority::DepthFirst => self.depth().await,
            Priority::BreadthFirst => self.breadth().await,
        }
    }
}