1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use crate::{SelectorTyp, Response, Client, Register, new_selector, CakeError, CakeResult,
Failover, Failtry};
use rmpv::Value;
use tokio::net::TcpStream;
use tokio_util::compat::TokioAsyncReadCompatExt;
use std::io;
use std::sync::Arc;
use serde::{Serialize, Deserialize};
use crate::failmode::FailMode;
#[derive(Clone, Serialize, Deserialize)]
pub struct CakeClient {
svc_prefix: String,
svc_name: String,
reg_typ: String,
reg_addr: String,
selector_typ: SelectorTyp,
failmode: FailMode,
}
impl CakeClient {
pub fn new(svc_prefix: String, svc_name: String, reg_typ: String, reg_addr: String,
selector_typ: SelectorTyp, failmode: FailMode) -> Self {
CakeClient {
svc_prefix,
svc_name,
reg_typ,
reg_addr,
selector_typ,
failmode,
}
}
pub async fn call(&mut self, method: &str, params: &[Value]) -> CakeResult<String> {
match &self.failmode {
FailMode::FailFast => {
let res = self.wrap_call(method, params).await;
return res;
},
FailMode::Failtry(fm ) => {
let mut retries = fm.retries;
while retries >= 0 {
let res = self.wrap_call(method, params).await;
match res {
Ok(rsp) => {
return Ok(rsp.to_string())
},
Err(err) => {
error!("cakeClient call Error --------------- {:?}", err)
}
}
retries -= 1;
}
return Ok("--@@@--cakeClient call Failtry max retries Error--@@@--".to_string());
},
_ => {}
}
return Ok("".to_string());
}
pub async fn wrap_call(&mut self, method: &str, params: &[Value]) -> CakeResult<String> {
let svc_namex = &self.svc_name.clone();
let mut reg = Register::new_for_client((&self.reg_typ).to_string(),
(&self.reg_addr).to_string(),
svc_namex.to_string(),
(&self.svc_prefix).to_string(), false);
let svc_nodes = reg.get_service_nodes(self.svc_name.clone());
trace!("svc_nodes ------------- {:?}", svc_nodes);
let mut selector = new_selector(&self.selector_typ,
svc_nodes);
let node_addr = selector.select();
let socket = TcpStream::connect(&node_addr).await?;
let inner_client = Client::new(socket.compat());
let rsp = inner_client.call(method, params);
match rsp.await {
Ok(rspx) => {
Ok(rspx.to_string())
}
Err(err) => {
Err(CakeError(format!("client.call Error: {:?}", err)))
}
}
}
}