cakerabbit_core/
cakeclient.rs1
2use crate::{SelectorTyp, Response, Client, Register, new_selector, CakeError, CakeResult,
3 Failover, Failtry};
4use rmpv::Value;
5use tokio::net::TcpStream;
6use tokio_util::compat::TokioAsyncReadCompatExt;
7use std::io;
8use std::sync::Arc;
9use serde::{Serialize, Deserialize};
10use crate::failmode::FailMode;
11use crate::errors::cake_errors;
12
13#[derive(Clone, Serialize, Deserialize)]
14pub struct CakeClient {
15 svc_prefix: String,
16 svc_name: String,
17 reg_typ: String,
18 reg_addr: String,
19 selector_typ: SelectorTyp,
20 failmode: FailMode,
21}
22
23
24impl CakeClient {
25 pub fn new(svc_prefix: String, svc_name: String, reg_typ: String, reg_addr: String,
26 selector_typ: SelectorTyp, failmode: FailMode) -> Self {
27 CakeClient {
28 svc_prefix,
29 svc_name,
30 reg_typ,
31 reg_addr,
32 selector_typ,
33 failmode,
34 }
35 }
36
37 pub async fn call(&mut self, method: &str, params: &[Value]) -> CakeResult<String> {
38 match &self.failmode {
39 FailMode::FailFast => {
40 let res = self.wrap_call(method, params).await;
41 return res;
42 },
43
44 FailMode::Failtry(fm ) => {
45 let mut retries = fm.retries;
46 while retries >= 0 {
47 let res = self.wrap_call(method, params).await;
48 match res {
49 Ok(rsp) => {
50 return Ok(rsp.to_string())
51 },
52
53 Err(err) => {
54 error!("cakeClient call Error -->>> {:?}", err)
55 }
56 }
57 retries -= 1;
58 }
59
60 return Err(CakeError(cake_errors("callFailtryErr")));
62 },
63
64 _ => {}
65 }
66
67 return Ok("".to_string());
68 }
69
70 pub async fn wrap_call(&mut self, method: &str, params: &[Value]) -> CakeResult<String> {
71 let svc_namex = &self.svc_name.clone();
72 let mut reg = Register::new_for_client((&self.reg_typ).to_string(),
73 (&self.reg_addr).to_string(),
74 svc_namex.to_string(),
75 (&self.svc_prefix).to_string(), false);
76 let svc_nodes = reg.get_service_nodes(self.svc_name.clone());
77 trace!("svc_nodes -->>> {:?}", svc_nodes);
78 let mut selector = new_selector(&self.selector_typ,
79 svc_nodes);
80 let node_addr = selector.select();
81
82 let socket = TcpStream::connect(&node_addr).await?;
83 let inner_client = Client::new(socket.compat());
84 let rsp = inner_client.call(method, params);
85
86 match rsp.await {
87 Ok(rspx) => {
88 Ok(rspx.to_string())
89 }
90
91 Err(err) => {
92 Err(CakeError(format!("client.call Error -->>> {:?}", err)))
93 }
94 }
95
96 }
97}
98