cakerabbit_core/
reg.rs

1use std::net::{Shutdown, TcpStream};
2use crate::CakeError;
3use crate::reg_consul::{RegConsul};
4use env_logger::Env;
5use serde::__private::de::Content::{U32, U64};
6use crate::{CONFIG};
7use std::thread;
8use std::time;
9
10pub trait RegisterImpl {
11
12  fn do_reg(&mut self) -> Result<bool, CakeError>;
13
14  fn do_reg_http(&mut self, http_addr: String, typ: &str) -> Result<bool, CakeError>;
15
16  fn do_reg_external(&mut self, svc_address: String, typ: &str,
17    interval: u64) -> Result<bool, CakeError>;
18
19  fn watch_services(&mut self) -> Result<bool, CakeError>;
20
21  fn get_service_nodes(&self, service: String) -> Vec<String>;
22}
23
24pub struct Register {
25  adapter: String,
26  debug:   bool,
27}
28
29impl Register {
30  pub fn new_for_service(adapter: String, regaddr: String, svc_name: String, svc_prefix: String, svc_port: String, svc_ttl: String, debug: bool) -> Box<dyn RegisterImpl> {
31    return match adapter.as_str() {
32      "consul" => {
33        Box::new(RegConsul::new(regaddr, svc_name, svc_prefix,
34                                svc_port, svc_ttl, debug))
35      }
36
37      _ => {
38        Box::new(RegConsul::new(regaddr, svc_name, svc_prefix,
39                                svc_port, svc_ttl, debug))
40      }
41    }
42  }
43
44  pub fn new_for_client(adapter: String, regaddr: String, svc_name: String,
45                        svc_prefix: String, debug: bool) -> Box<dyn RegisterImpl> {
46    return match adapter.as_str() {
47      "consul" => {
48        Box::new(RegConsul::new(regaddr, svc_name, svc_prefix,
49                                "".into(), "".into(), debug))
50      }
51
52      _ => {
53        Box::new(RegConsul::new(regaddr, svc_name, svc_prefix,
54                                "".into(), "".into(), debug))
55      }
56    }
57  }
58}
59
60// check the status for external service, use tcp connect port
61pub fn check_service(svc_name: &str, svc_address: &str) -> bool {
62  // log::info!("=== check_service {} ===", svc_address);
63  let mut stream = TcpStream::connect(svc_address);
64  match stream {
65    Ok(s) => {
66      log::info!("=== check_service {}, {} === service status: true", svc_name, svc_address);
67      s.shutdown(Shutdown::Both).unwrap();
68      return true;
69    }
70
71    Err(_) => {
72      log::error!("=== check_service {}, {} === service status: false,\
73       checker will retries {} times in ThreadID {:?}", svc_name, svc_address, CONFIG["service_check_retries"], thread::current().id());
74
75      let service_check_retries = CONFIG["service_check_retries"].parse::<u64>().unwrap();
76      for i in 0..service_check_retries {
77        log::error!("=== check_service {}, {} === service status: false, checker retries \
78        times {} in ThreadID {:?}", svc_name, svc_address, i, thread::current().id());
79        let check_res = inner_check_service(svc_address);
80        if check_res {      // if service status OK again
81          return true;
82        }
83        let check_itval = CONFIG["service_check_interval"].parse::<u64>().unwrap();
84        thread::sleep(time::Duration::from_secs(check_itval));
85      }
86
87      log::error!("=== check_service {}, {} === service status: false in ThreadID {:?}",
88        svc_name, svc_address, thread::current().id());
89      return false;
90    }
91  }
92}
93
94// todo: retry check service will occur a problem, if thread-A is retrying check service, when
95// todo: the checker until 80 seconds, this time the service restart again, and online status
96// todo: within 10 seconds, the API create a thread-B to handle this. and the thread-A next 20
97// todo: seconds check will be OK, in this case, that has two threads to handler one service,
98// todo: it should not be. so this solution is not good enough!!
99// todo: but this solution can run OK, because when new request for register service, it has
100// todo: two condition
101// todo: 1 is the service info is not in register center, in this case, the new
102// todo: request will recreate the session, so it will not two threads handle one service, the old
103// todo: thread will occur [ERROR] `CustomError("renew session err` and exist. the new thread will
104// todo: take over it.
105// todo: 2 is the service info is still in register center, in this case, the old thread checker
106// todo: will check the service status, it is true, old thread will lopp handle again, the new
107// todo: thread waill exist when it found the service info is `exist`
108fn inner_check_service(svc_address: &str) -> bool {
109  let mut stream = TcpStream::connect(svc_address);
110  match stream {
111    Ok(s) => {
112      s.shutdown(Shutdown::Both).unwrap();
113      true
114    }
115
116    Err(_) => false
117  }
118}
119
120
121#[test]
122fn test_register_svc() {
123  env_logger::from_env(Env::default().default_filter_or("info")).init();
124  let mut reg = Register::new_for_service("consul".to_string(),
125                                          "8.8.8.8:8500".to_string(),
126                                          "my_svc".to_string(),
127                                          "cake/".to_string(),
128                                          "9527".to_string(),
129                                          "1m0s".to_string(),
130                                          true);
131  let res = reg.do_reg(); match res {
132    Ok(_) => {}
133    Err(_) => {}
134  }
135}
136
137#[test]
138fn check_service_test() {
139  env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
140  // let check = check_service("svc_name", "127.0.0.1:8500");
141  let check = check_service("service_access_demo", "127.0.0.1:8989");
142  println!("check -->>> {}", check);
143}
144
145
146
147