cakerabbit_core/
cakeclient.rs

1
2use crate::{SelectorTyp, Response, Client, Register, new_selector, CakeError, CakeResult,
3            Failover, Failtry};
4use rmpv::Value;
5use tokio::net::TcpStream;
6use tokio_util::compat::TokioAsyncReadCompatExt;
7use std::io;
8use std::sync::Arc;
9use serde::{Serialize, Deserialize};
10use crate::failmode::FailMode;
11use crate::errors::cake_errors;
12
13#[derive(Clone, Serialize, Deserialize)]
14pub struct CakeClient {
15  svc_prefix:       String,
16  svc_name:         String,
17  reg_typ:          String,
18  reg_addr:         String,
19  selector_typ:     SelectorTyp,
20  failmode:         FailMode,
21}
22
23
24impl CakeClient {
25  pub fn new(svc_prefix: String, svc_name: String, reg_typ: String, reg_addr: String,
26             selector_typ: SelectorTyp, failmode: FailMode) -> Self {
27    CakeClient {
28      svc_prefix,
29      svc_name,
30      reg_typ,
31      reg_addr,
32      selector_typ,
33      failmode,
34    }
35  }
36
37  pub async fn call(&mut self, method: &str, params: &[Value]) -> CakeResult<String> {
38    match &self.failmode {
39      FailMode::FailFast => {
40        let res = self.wrap_call(method, params).await;
41        return res;
42      },
43
44      FailMode::Failtry(fm ) => {
45        let mut retries = fm.retries;
46        while retries >= 0 {
47          let res = self.wrap_call(method, params).await;
48          match res {
49            Ok(rsp) => {
50              return Ok(rsp.to_string())
51            },
52
53            Err(err) => {
54              error!("cakeClient call Error -->>> {:?}", err)
55            }
56          }
57          retries -= 1;
58        }
59
60        // return Ok("error".to_string());
61        return Err(CakeError(cake_errors("callFailtryErr")));
62      },
63
64      _ => {}
65    }
66
67    return Ok("".to_string());
68  }
69
70  pub async fn wrap_call(&mut self, method: &str, params: &[Value]) -> CakeResult<String> {
71    let svc_namex = &self.svc_name.clone();
72    let mut reg = Register::new_for_client((&self.reg_typ).to_string(),
73                                           (&self.reg_addr).to_string(),
74                                           svc_namex.to_string(),
75                                           (&self.svc_prefix).to_string(), false);
76    let svc_nodes = reg.get_service_nodes(self.svc_name.clone());
77    trace!("svc_nodes -->>> {:?}", svc_nodes);
78    let mut selector = new_selector(&self.selector_typ,
79                                    svc_nodes);
80    let node_addr = selector.select();
81
82    let socket = TcpStream::connect(&node_addr).await?;
83    let inner_client = Client::new(socket.compat());
84    let rsp = inner_client.call(method, params);
85
86    match rsp.await {
87      Ok(rspx) => {
88        Ok(rspx.to_string())
89     }
90
91      Err(err) => {
92        Err(CakeError(format!("client.call Error -->>> {:?}", err)))
93      }
94    }
95
96  }
97}
98