1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

use crate::{SelectorTyp, Response, Client, Register, new_selector, CakeError, CakeResult,
            Failover, Failtry};
use rmpv::Value;
use tokio::net::TcpStream;
use tokio_util::compat::TokioAsyncReadCompatExt;
use std::io;
use std::sync::Arc;
use serde::{Serialize, Deserialize};
use crate::failmode::FailMode;

#[derive(Clone, Serialize, Deserialize)]
pub struct CakeClient {
  svc_prefix:       String,
  svc_name:         String,
  reg_typ:          String,
  reg_addr:         String,
  selector_typ:     SelectorTyp,
  failmode:         FailMode,
}


impl CakeClient {
  pub fn new(svc_prefix: String, svc_name: String, reg_typ: String, reg_addr: String,
             selector_typ: SelectorTyp, failmode: FailMode) -> Self {
    CakeClient {
      svc_prefix,
      svc_name,
      reg_typ,
      reg_addr,
      selector_typ,
      failmode,
    }
  }

  pub async fn call(&mut self, method: &str, params: &[Value]) -> CakeResult<String> {
    match &self.failmode {
      FailMode::FailFast => {
        let res = self.wrap_call(method, params).await;
        return res;
      },

      FailMode::Failtry(fm ) => {
        let mut retries = fm.retries;
        while retries >= 0 {
          let res = self.wrap_call(method, params).await;
          match res {
            Ok(rsp) => {
              return Ok(rsp.to_string())
            },

            Err(err) => {
              error!("cakeClient call Error --------------- {:?}", err)
            }
          }
          retries -= 1;
        }

        return Ok("--@@@--cakeClient call Failtry max retries Error--@@@--".to_string());
      },

      _ => {}
    }

    return Ok("".to_string());
  }

  pub async fn wrap_call(&mut self, method: &str, params: &[Value]) -> CakeResult<String> {
    let svc_namex = &self.svc_name.clone();
    let mut reg = Register::new_for_client((&self.reg_typ).to_string(),
                                           (&self.reg_addr).to_string(),
                                           svc_namex.to_string(),
                                           (&self.svc_prefix).to_string(), false);
    let svc_nodes = reg.get_service_nodes(self.svc_name.clone());
    trace!("svc_nodes ------------- {:?}", svc_nodes);
    let mut selector = new_selector(&self.selector_typ,
                                    svc_nodes);
    let node_addr = selector.select();

    let socket = TcpStream::connect(&node_addr).await?;
    let inner_client = Client::new(socket.compat());
    let rsp = inner_client.call(method, params);

    match rsp.await {
      Ok(rspx) => {
        Ok(rspx.to_string())
     }

      Err(err) => {
        Err(CakeError(format!("client.call Error: {:?}", err)))
      }
    }

  }
}