mco_rpc/
balance_manager.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4use mco::{err};
5use mco::coroutine::sleep;
6use balance::{LoadBalance, LoadBalanceType};
7use client::Client;
8use mco::std::errors::Result;
9use mco::std::sync::{SyncHashMap};
10use serde::de::DeserializeOwned;
11use serde::Serialize;
12
13pub trait RegistryCenter: Sync + Send {
15 fn pull(&self) -> HashMap<String, Vec<String>>;
17 fn push(&self, service: String, addr: String, ex:Duration) -> Result<()>;
18}
19
20#[derive(Debug, Clone)]
21pub struct ManagerConfig {
22 pub balance: LoadBalanceType,
23 pub interval: Duration,
24}
25
26impl ManagerConfig {
27 pub fn new() -> Self {
28 Self::default()
29 }
30 pub fn balance(mut self, balance: LoadBalanceType) -> Self {
31 self.balance = balance;
32 self
33 }
34 pub fn interval(mut self, d: Duration) -> Self {
35 self.interval = d;
36 self
37 }
38}
39
40impl Default for ManagerConfig {
41 fn default() -> Self {
42 ManagerConfig {
43 balance: LoadBalanceType::Round,
44 interval: Duration::from_secs(5),
45 }
46 }
47}
48
49
50pub struct BalanceManger {
53 pub config: ManagerConfig,
54 pub clients: SyncHashMap<String, LoadBalance<Client>>,
55 pub fetcher: Arc<dyn RegistryCenter>,
56}
57
58impl BalanceManger {
59 pub fn new<F>(cfg: ManagerConfig, f: F) -> Arc<Self> where F: RegistryCenter + 'static {
60 Arc::new(Self {
61 config: cfg,
62 clients: SyncHashMap::new(),
63 fetcher: Arc::new(f),
64 })
65 }
66
67 pub fn pull(&self) -> Result<()> {
69 let addrs = self.fetcher.pull();
70 if addrs.is_empty(){
71 self.clients.clear();
72 }
73 for (s, addrs) in addrs {
74 let balance = self.clients.get(&s);
75 if let Some(clients) = balance {
76 for addr in &addrs {
77 if !clients.contains(addr) {
78 let c = Client::dial(addr)?;
79 clients.put(c);
80 }
81 }
82 let mut removes = vec![];
83 for x in &clients.rpc_clients {
84 if !addrs.contains(&x.addr){
85 removes.push(&x.addr);
86 }
87 }
88 for x in removes {
89 clients.remove(x);
90 }
91 } else {
92 let clients = LoadBalance::new();
93 for x in addrs {
94 let c = Client::dial(&x)?;
95 clients.put(c);
96 }
97 self.clients.insert(s, clients);
98 }
99 }
100 return Ok(());
101 }
102
103 pub fn spawn_pull(&self) {
104 loop {
105 let r = self.pull();
106 if r.is_err() {
107 log::error!("service fetch fail:{}",r.err().unwrap());
108 }
109 sleep(self.config.interval);
110 }
111 }
112
113 pub fn spawn_push(&self,service: String, addr: String) {
114 loop {
115 let r = self.fetcher.push(service.clone(),addr.clone(),self.config.interval.clone()*2);
116 if r.is_err() {
117 log::error!("service fetch fail:{}",r.err().unwrap());
118 }
119 sleep(self.config.interval);
120 }
121 }
122
123 pub fn call<Arg, Resp>(&self, service: &str, func: &str, arg: Arg) -> Result<Resp> where Arg: Serialize, Resp: DeserializeOwned {
124 return match self.clients.get(service)
125 .ok_or(err!("no service '{}' find!",service))?
126 .do_balance(self.config.balance, "") {
127 None => {
128 Err(err!("no service '{}' find!",service))
129 }
130 Some(c) => {
131 c.call(func, arg)
132 }
133 };
134 }
135
136 pub fn call_all<Arg, Resp>(&self, service: &str, func: &str, arg: Arg, ip: &str) -> Result<Resp> where Arg: Serialize, Resp: DeserializeOwned {
137 return match self.clients
138 .get(service).ok_or(err!("no service '{}' find!",service))?
139 .do_balance(self.config.balance, ip) {
140 None => {
141 Err(err!("no service '{}' find!",service))
142 }
143 Some(c) => {
144 c.call(func, arg)
145 }
146 };
147 }
148}
149
150#[cfg(test)]
151mod test {
152 #[test]
153 fn test_fetch() {}
154}