wd_balancing/
entity.rs

1use crate::{BalancingCall,BalancingStrategy,Linker,Polling};
2use std::sync::Arc;
3use std::collections::HashMap;
4use tokio::sync::RwLock;
5use std::ops::DerefMut;
6use std::hash::Hash;
7
8
9
10pub struct Balancing<K,Req, Res>{
11    map:RwLock<HashMap<K,Box<dyn BalancingCall<Req, Res>+Send+Sync>>>,
12    strategy:Arc<dyn BalancingStrategy<K> + Send + Sync>,
13    linker:Option<Arc<dyn Linker<K> + Send + Sync>>,
14}
15impl<K:Clone + Eq + Hash + Send + Sync + 'static ,Req, Res> Balancing<K,Req, Res>{
16    pub fn new()->Self{
17        let map = RwLock::new(HashMap::new());
18        let strategy = Arc::new(Polling::<K>::new());
19        let linker = None;
20        Self{map,strategy,linker}
21    }
22    pub fn set_strategy<B>(mut self, strategy:B) ->Self
23        where B:BalancingStrategy<K> + Send + Sync + 'static
24    {
25        let strategy = Arc::new(strategy);
26        self.strategy = strategy;
27        self
28    }
29    pub fn set_strategy_linker<B>(mut self,strategy:B)->Self
30        where B:BalancingStrategy<K>+Linker<K> + Send + Sync + 'static
31    {
32        let strategy = Arc::new(strategy);
33        self.strategy = strategy.clone();
34        self.linker = Some(strategy);
35        self
36    }
37
38    pub async fn add<N:BalancingCall<Req, Res>+Send+Sync+'static>(&self,key:K,n:N,w:usize){
39        if w <= 0{
40            return;
41        }
42        let mut map = self.map.write().await;
43        let map = map.deref_mut();
44        map.insert(key.clone(),Box::new(n));
45        self.strategy.add(key,w).await;
46    }
47    pub async fn remove(&self, key:K) -> Option<Box<dyn BalancingCall<Req, Res>+Send+Sync>> {
48        let mut map = self.map.write().await;
49        let map = map.deref_mut();
50        let res = map.remove(&key);
51        self.strategy.remove(key).await;
52        res
53    }
54    pub async fn call(&self,reqs:Req)->Option<Res>{
55        let mut resp = None;
56        let map = self.map.read().await;
57        if let Some(k) = self.strategy.select().await {
58
59            if let Some(node) = map.get(&k) {
60                if let Some(ref s) = self.linker {
61                    s.acquire(k.clone()).await
62                }
63
64                resp = node.call(reqs).await;
65
66                if let Some(ref s) = self.linker {
67                    s.release(k).await
68                }
69            }
70        }
71        resp
72    }
73}