1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
mod entity;
mod interface;
mod polling;
mod random;
mod minconn;

pub use interface::{BalancingCall,Linker,BalancingStrategy};
pub use polling::Polling;
pub use entity::Balancing;
pub use random::Random;
pub use minconn::MinConn;

#[cfg(test)]
mod test{
    use crate::{BalancingCall, Balancing, Polling, Random, MinConn};
    use rand::Rng;
    use std::sync::Arc;
    use tokio::sync::Mutex;
    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::ops::DerefMut;

    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
    async fn test_main(){
        let t = std::time::Instant::now();
        let rng = Arc::new(Mutex::new(rand::rngs::OsRng::default()));
        let threads = Arc::new(AtomicUsize::new(100));
        for _ in 0..100 {
            let threads = threads.clone();
            let rng = rng.clone();
            tokio::spawn(async{
                let rng = rng;
                let threads = threads;
                for _ in 0..1000 {
                   let mut rng = rng.lock().await;
                   let rng =  rng.deref_mut();
                    let _res = rng.gen_range(0..1000);
                }
                threads.fetch_sub(1,Ordering::Relaxed);
            });
        }
        loop {
            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
            if threads.load(Ordering::Relaxed) == 0{
                break
            }
        }
        let res = t.elapsed().as_millis();
        let res = (res as f64) / 1000.0;
        wd_log::log_debug_ln!("四个线程生成一百万个随机数需要:{}s",res)
    }
    struct Service {
        name:&'static str
    }
    impl Service{
        fn new(name:&'static str)->Self{
            Self{name}
        }
    }
    #[async_trait::async_trait]
    impl BalancingCall<String, String> for Service{
        async fn call(&self, reqs: String) -> Option<String> {
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            Some(format!("request:{} -> response:{}",reqs,self.name))
        }
    }
    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn test_polling(){
        wd_log::log_debug_ln!("start test polling");
        let bl = Balancing::new()
            .set_strategy(Polling::new().breadth_first());

        bl.add(1,Service::new("我是第一个节点"),1).await;
        bl.add(2,Service::new("我是第二个节点"),1).await;
        bl.add(3,Service::new("我是第三个节点"),1).await;

        for i in (0..9).map(|x|x as usize) {
            let resp = bl.call(i.to_string()).await;
            if let Some(s) = resp{
                wd_log::log_debug_ln!("-> {}",s)
            }
        }
        wd_log::log_debug_ln!("===================> 移除2号节点===========================");
        for i in (0..9).map(|x|x as usize) {
            let resp = bl.call(i.to_string()).await;
            if let Some(s) = resp{
                wd_log::log_debug_ln!("-> {}",s)
            }
        }
        bl.remove(1).await;
        bl.remove(3).await;
        wd_log::log_debug_ln!("===================> 移除全部节点===========================");
        for i in (0..3).map(|x|x as usize) {
            let resp = bl.call(i.to_string()).await;
            if let Some(s) = resp{
                wd_log::log_debug_ln!("-> {}",s);
            }else{
                wd_log::log_debug_ln!("-> 当前没有存活的节点");
            }
        }
    }
    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn test_random(){
        wd_log::log_debug_ln!("start test random");
        let bl = Balancing::new()
            .set_strategy(Random::new());

        bl.add(1,Service::new("我是第一个节点"),3).await;
        bl.add(2,Service::new("我是第二个节点"),2).await;
        bl.add(3,Service::new("我是第三个节点"),1).await;

        for i in (0..18).map(|x|x as usize) {
            let resp = bl.call(i.to_string()).await;
            if let Some(s) = resp{
                wd_log::log_debug_ln!("-> {}",s)
            }
        }
        bl.remove(2).await;
        wd_log::log_debug_ln!("===================> 移除2号节点===========================");
        for i in (0..9).map(|x|x as usize) {
            let resp = bl.call(i.to_string()).await;
            if let Some(s) = resp{
                wd_log::log_debug_ln!("-> {}",s)
            }
        }
        bl.remove(1).await;
        bl.remove(3).await;
        wd_log::log_debug_ln!("===================> 移除全部节点===========================");
        for i in (0..3).map(|x|x as usize) {
            let resp = bl.call(i.to_string()).await;
            if let Some(s) = resp{
                wd_log::log_debug_ln!("-> {}",s);
            }else{
                wd_log::log_debug_ln!("-> 当前没有存活的节点");
            }
        }
    }
    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn test_minconn(){
        wd_log::log_debug_ln!("start test minconn");
        let bl = Balancing::new()
            .set_strategy_linker(MinConn::new());
        let bl = Arc::new(bl);

        bl.add(1,Service::new("我是第一个节点"),3).await;
        bl.add(2,Service::new("我是第二个节点"),2).await;
        bl.add(3,Service::new("我是第三个节点"),1).await;

        for i in (0..18).map(|x|x as usize) {
            let bbl = bl.clone();
            tokio::spawn(async move{
                let resp = bbl.call(i.to_string()).await;
                if let Some(s) = resp{
                    wd_log::log_debug_ln!("-> {}",s)
                }
            });
        }
        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
        bl.remove(2).await;
        wd_log::log_debug_ln!("===================> 移除2号节点===========================");
        for i in (0..9).map(|x|x as usize) {
            let resp = bl.call(i.to_string()).await;
            if let Some(s) = resp{
                wd_log::log_debug_ln!("-> {}",s)
            }
        }
        bl.remove(1).await;
        bl.remove(3).await;
        wd_log::log_debug_ln!("===================> 移除全部节点===========================");
        for i in (0..3).map(|x|x as usize) {
            let resp = bl.call(i.to_string()).await;
            if let Some(s) = resp{
                wd_log::log_debug_ln!("-> {}",s);
            }else{
                wd_log::log_debug_ln!("-> 当前没有存活的节点");
            }
        }
    }
}