mco_rpc/
balance_manager.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4use mco::{err};
5use mco::coroutine::sleep;
6use balance::{LoadBalance, LoadBalanceType};
7use client::Client;
8use mco::std::errors::Result;
9use mco::std::sync::{SyncHashMap};
10use serde::de::DeserializeOwned;
11use serde::Serialize;
12
13/// to fetch remote service addr list
14pub trait RegistryCenter: Sync + Send {
15    ///fetch [service]Vec<addr>
16    fn pull(&self) -> HashMap<String, Vec<String>>;
17    fn push(&self, service: String, addr: String, ex:Duration) -> Result<()>;
18}
19
20#[derive(Debug, Clone)]
21pub struct ManagerConfig {
22    pub balance: LoadBalanceType,
23    pub interval: Duration,
24}
25
26impl ManagerConfig {
27    pub fn new() -> Self {
28        Self::default()
29    }
30    pub fn balance(mut self, balance: LoadBalanceType) -> Self {
31        self.balance = balance;
32        self
33    }
34    pub fn interval(mut self, d: Duration) -> Self {
35        self.interval = d;
36        self
37    }
38}
39
40impl Default for ManagerConfig {
41    fn default() -> Self {
42        ManagerConfig {
43            balance: LoadBalanceType::Round,
44            interval: Duration::from_secs(5),
45        }
46    }
47}
48
49
50/// this is a connect manager.
51/// Accepts a server addresses list,make a client list.
52pub struct BalanceManger {
53    pub config: ManagerConfig,
54    pub clients: SyncHashMap<String, LoadBalance<Client>>,
55    pub fetcher: Arc<dyn RegistryCenter>,
56}
57
58impl BalanceManger {
59    pub fn new<F>(cfg: ManagerConfig, f: F) -> Arc<Self> where F: RegistryCenter + 'static {
60        Arc::new(Self {
61            config: cfg,
62            clients: SyncHashMap::new(),
63            fetcher: Arc::new(f),
64        })
65    }
66
67    /// fetch addr list
68    pub fn pull(&self) -> Result<()> {
69        let addrs = self.fetcher.pull();
70        if addrs.is_empty(){
71            self.clients.clear();
72        }
73        for (s, addrs) in addrs {
74            let balance = self.clients.get(&s);
75            if let Some(clients) = balance {
76                for addr in &addrs {
77                    if !clients.contains(addr) {
78                        let c = Client::dial(addr)?;
79                        clients.put(c);
80                    }
81                }
82                let mut removes = vec![];
83                for x in &clients.rpc_clients {
84                    if !addrs.contains(&x.addr){
85                        removes.push(&x.addr);
86                    }
87                }
88                for x in removes {
89                    clients.remove(x);
90                }
91            } else {
92                let clients = LoadBalance::new();
93                for x in addrs {
94                    let c = Client::dial(&x)?;
95                    clients.put(c);
96                }
97                self.clients.insert(s, clients);
98            }
99        }
100        return Ok(());
101    }
102
103    pub fn spawn_pull(&self) {
104        loop {
105            let r = self.pull();
106            if r.is_err() {
107                log::error!("service fetch fail:{}",r.err().unwrap());
108            }
109            sleep(self.config.interval);
110        }
111    }
112
113    pub fn spawn_push(&self,service: String, addr: String) {
114        loop {
115            let r = self.fetcher.push(service.clone(),addr.clone(),self.config.interval.clone()*2);
116            if r.is_err() {
117                log::error!("service fetch fail:{}",r.err().unwrap());
118            }
119            sleep(self.config.interval);
120        }
121    }
122
123    pub fn call<Arg, Resp>(&self, service: &str, func: &str, arg: Arg) -> Result<Resp> where Arg: Serialize, Resp: DeserializeOwned {
124        return match self.clients.get(service)
125            .ok_or(err!("no service '{}' find!",service))?
126            .do_balance(self.config.balance, "") {
127            None => {
128                Err(err!("no service '{}' find!",service))
129            }
130            Some(c) => {
131                c.call(func, arg)
132            }
133        };
134    }
135
136    pub fn call_all<Arg, Resp>(&self, service: &str, func: &str, arg: Arg, ip: &str) -> Result<Resp> where Arg: Serialize, Resp: DeserializeOwned {
137        return match self.clients
138            .get(service).ok_or(err!("no service '{}' find!",service))?
139            .do_balance(self.config.balance, ip) {
140            None => {
141                Err(err!("no service '{}' find!",service))
142            }
143            Some(c) => {
144                c.call(func, arg)
145            }
146        };
147    }
148}
149
150#[cfg(test)]
151mod test {
152    #[test]
153    fn test_fetch() {}
154}