1use super::agent::{AgentServiceRegistration, ServiceRegisterOpts};
2use super::health::{ServiceAddress, ServiceEntry};
3use super::watch::WatchService;
4use async_std::fs::read_to_string;
5use async_std::sync::{Arc, RwLock};
6use lazy_static::lazy_static;
7use rand::Rng;
8use serde_derive::{Deserialize, Serialize};
9use serde_yaml;
10use std::collections::{HashMap, LinkedList};
11use std::time;
12use surf;
13use surf::http::Method;
14use surf::{Error, StatusCode};
15use toml;
16
17lazy_static! {
18 pub static ref CONSUL_CONFIG: Arc<RwLock<ConsulConfig>> = {
19 let consul_config = ConsulConfig::default();
20 let consul_config = RwLock::new(consul_config);
21 Arc::new(consul_config)
22 };
23 pub static ref SERVICES_ADDRESS: Arc<RwLock<HashMap<String, ServiceAddress>>> = {
24 let hash_map = HashMap::new();
25 let hash_map = RwLock::new(hash_map);
26 Arc::new(hash_map)
27 };
28}
29
30#[derive(Debug, Serialize, Deserialize, Clone)]
31pub struct ConsulConfig {
32 pub config: Option<Config>,
33 pub watch_services: Option<Vec<WatchService>>,
34}
35
36impl Default for ConsulConfig {
37 fn default() -> Self {
38 let mut config = Config::default();
39 config.address = Some(String::from("http://127.0.0.1:8500"));
40 config.datacenter = Some(String::from("dc1"));
41 ConsulConfig {
42 config: Some(config),
43 watch_services: None,
44 }
45 }
46}
47
48impl ConsulConfig {
49 pub async fn load_config(path: &str) -> surf::Result<()> {
50 let content = read_to_string(path).await?;
51 let mut config = ConsulConfig::default();
52
53 if path.ends_with(".yml") || path.ends_with(".yaml") {
54 config = serde_yaml::from_str(&content)?;
55 } else if path.ends_with(".toml") {
56 config = toml::from_str(&content)?;
57 }
58
59 let consul_config = CONSUL_CONFIG.clone();
60 let mut consul_config = consul_config.write().await;
61 consul_config.config = config.config;
62 consul_config.watch_services = config.watch_services;
63 Ok(())
64 }
65
66 pub async fn new_request(&self, method: Method, path: &str) -> surf::Result<surf::Request> {
67 let config = self.config.as_ref().expect("consul config is empty");
68 let address = config
69 .address
70 .as_ref()
71 .expect("consul config address is empty");
72 let url = format!("{}{}", address, path);
73 let uri = surf::Url::parse(&url)?;
74 let mut req = surf::Request::new(method, uri);
75 req.set_header("Connection", "close");
76 let mut body: HashMap<String, String> = HashMap::new();
77
78 if config.datacenter.is_some() {
79 body.insert(
80 String::from("dc"),
81 String::from(config.datacenter.as_ref().unwrap()),
82 );
83 };
84 if config.namespace.is_some() {
85 body.insert(
86 String::from("ns"),
87 String::from(config.namespace.as_ref().unwrap()),
88 );
89 };
90
91 if config.wait_time.is_some() {
92 let wait = config.wait_time.as_ref().unwrap().to_string();
93 body.insert(String::from("wait"), wait);
94 } else {
95 body.insert(String::from("wait"), String::from("5s"));
96 }
97
98 if config.token.is_some() {
99 body.insert(
100 "X-Consul-Token".to_string(),
101 String::from(config.token.as_ref().unwrap()),
102 );
103 };
104
105 req.body_json(&body)?;
106 Ok(req)
107 }
108
109 pub async fn service_register(
127 &self,
128 service: &AgentServiceRegistration,
129 ) -> surf::Result<StatusCode> {
130 let opts = ServiceRegisterOpts::default();
131 let status = self.service_register_self(service, &opts).await?;
132 Ok(status)
133 }
134
135 pub async fn service_register_opts(
136 &self,
137 service: &AgentServiceRegistration,
138 opts: &ServiceRegisterOpts,
139 ) -> surf::Result<StatusCode> {
140 let status = self.service_register_self(service, opts).await?;
141 Ok(status)
142 }
143
144 pub async fn service_register_self(
145 &self,
146 service: &AgentServiceRegistration,
147 opts: &ServiceRegisterOpts,
148 ) -> surf::Result<StatusCode> {
149 if self.config.is_some() {
150 let mut req = self
151 .new_request(Method::Put, "/v1/agent/service/register")
152 .await?;
153 if opts.ReplaceExistingChecks == true {
154 req.set_query(&opts)?;
155 };
156 req.body_json(&service)?;
157 let client = surf::Client::new();
158 let res = client.send(req).await?;
159 Ok(res.status())
160 } else {
161 Err(Error::from_str(StatusCode::BadRequest, "client init err"))
162 }
163 }
164
165 pub async fn service_deregister(&self, service_id: String) -> surf::Result<StatusCode> {
180 if self.config.is_some() {
181 let uri = format!("/v1/agent/service/deregister/{}", service_id);
182 let req = self.new_request(Method::Put, &uri).await?;
183 let client = surf::Client::new();
184 let res = client.send(req).await?;
185 Ok(res.status())
186 } else {
187 Err(Error::from_str(StatusCode::BadRequest, "client init err"))
188 }
189 }
190
191 pub async fn watch_services() -> surf::Result<()> {
192 let config = CONSUL_CONFIG.clone();
193 let config = config.read().await;
194 if config.watch_services.is_some() {
195 loop {
196 let watch_services = config.watch_services.as_ref().unwrap();
197 let mut service_await = vec![];
198
199 for watch_service in watch_services.iter() {
200 service_await.push(config.get_address(watch_service))
201 }
202 let mut vv = HashMap::new();
203 for v in service_await.into_iter() {
204 let (key, service_address) = v.await?;
205 if key != "" {
206 vv.insert(key, service_address);
207 }
208 }
209 if vv.len() != 0 {
210 let services_addresses = SERVICES_ADDRESS.clone();
211 let mut services_addresses = services_addresses.write().await;
212 for (key, service_address) in vv.iter() {
213 services_addresses.insert(key.to_string(), service_address.to_owned());
214 }
215 }
216 }
217 }
218 Ok(())
219 }
220
221 async fn health_service(
222 &self,
223 watch_service: &WatchService,
224 ) -> surf::Result<(u64, Vec<ServiceEntry>)> {
225 let path = format!("/v1/health/service/{}", watch_service.service_name);
226 if self.config.is_some() {
227 let mut req = self.new_request(Method::Get, &path).await?;
228 let mut query: HashMap<&str, String> = HashMap::new();
229 let default = String::new();
230 let tag = watch_service.tag.as_ref().unwrap_or(&default);
231 if tag != "" {
232 query.insert("tag", tag.to_string());
233 }
234 let services_addresses = SERVICES_ADDRESS.clone();
235 let services_addresses = services_addresses.read().await;
236 let key = format!("{}{}", watch_service.service_name, tag);
237 let service_address = services_addresses.get(&key);
238 let mut index = 0;
239 if service_address.is_some() {
240 let service_address = service_address.unwrap();
241 index = service_address.index;
242 }
243 query.insert("index", index.to_string());
244
245 if watch_service.passing_only.is_some() {
246 let passing = watch_service.passing_only.unwrap();
247 if passing {
248 let config = self.config.as_ref().unwrap();
249 let wait;
250 if config.wait_time.is_some() {
251 wait = config.wait_time.as_ref().unwrap().to_string();
252 } else {
253 wait = String::from("5s")
254 }
255 query.insert("passing", "1".to_string());
256 query.insert("wait", wait);
257 }
258 };
259 req.set_query(&query)?;
260 let uri = req.url().to_string();
261 log::debug!("{}", uri);
262 let client = surf::Client::new();
263 let mut res = client.send(req).await?;
264 let out: Vec<ServiceEntry> = res.body_json().await?;
265 Ok((index, out))
266 } else {
267 Err(Error::from_str(StatusCode::BadRequest, "client init err"))
268 }
269 }
270
271 async fn get_address(
272 &self,
273 watch_service: &WatchService,
274 ) -> surf::Result<(String, ServiceAddress)> {
275 let (cur_index, entry) = self.health_service(watch_service).await?;
276 let mut service_addresses = vec![];
277 let mut service_addresses_link = LinkedList::new();
278 let mut index = 0;
279 for val in entry.iter() {
280 if val.Service.is_some() {
281 let v = val.Service.as_ref().unwrap();
282 if v.Address.is_some() && v.Port.is_some() {
283 index = v.ModifyIndex.unwrap();
284 if index == cur_index {
285 continue;
286 };
287 let address = v.Address.as_ref().unwrap();
288 let port = v.Port.as_ref().unwrap();
289 let address = format!("{}:{}", address, port);
290 service_addresses.push(address.to_owned());
291 service_addresses_link.push_back(address);
292 };
293 };
294 };
295 if service_addresses.len() == 0 {
296 return Ok((String::new(), ServiceAddress::default()));
297 };
298 let mut tag = "";
299 if watch_service.tag.is_some() {
300 tag = watch_service.tag.as_ref().unwrap();
301 };
302 let key = format!("{}{}", watch_service.service_name, tag);
303 let service_addresses = ServiceAddress {
304 index,
305 address: service_addresses,
306 address_link: service_addresses_link,
307 };
308
309 Ok((key, service_addresses))
310 }
311
312 pub async fn random_policy(&self, service_name: &str, tag: &str) -> surf::Result<String> {
313 let key = format!("{}{}", service_name, tag);
314 let services_addresses = SERVICES_ADDRESS.clone();
315 let services_addresses = services_addresses.read().await;
316 let service_addresses = services_addresses.get(&key);
317 if service_addresses.is_some() {
318 let service_addresses = service_addresses.unwrap();
319 let range = service_addresses.address.len();
320 if range == 0 {
321 return Err(Error::from_str(
322 StatusCode::BadRequest,
323 "consul server address is empty",
324 ));
325 };
326 let mut r = rand::thread_rng();
327 let idx: usize = r.gen_range(0..range);
328 let address = service_addresses.address.get(idx).unwrap();
329 return Ok(String::from(address));
330 }
331 Err(Error::from_str(
332 StatusCode::BadRequest,
333 "consul server address is empty",
334 ))
335 }
336}
337
338#[derive(Default, Debug, Clone, Serialize, Deserialize)]
340#[allow(non_snake_case)]
341pub struct Config {
342 pub address: Option<String>,
344
345 pub scheme: Option<String>,
347
348 pub datacenter: Option<String>,
350
351 pub wait_time: Option<String>,
364
365 pub token: Option<String>,
368
369 pub token_file: Option<String>,
372
373 pub namespace: Option<String>,
376
377 pub tls_config: Option<TLSConfig>,
378}
379
380#[derive(Default, Debug, Clone, Serialize, Deserialize)]
383#[allow(non_snake_case)]
384pub struct TLSConfig {
385 pub address: Option<String>,
389
390 pub ca_file: Option<String>,
393
394 pub ca_path: Option<String>,
397
398 pub ca_pem: Option<String>,
401
402 pub cert_file: Option<String>,
405
406 pub cert_pem: Option<String>,
409
410 pub key_file: Option<String>,
413
414 pub key_pem: Option<String>,
417
418 pub insecure_skip_verify: Option<bool>,
420}
421
422#[derive(Default, Debug, Clone, Serialize, Deserialize)]
424#[allow(non_snake_case)]
425pub struct QueryOptions {
426 pub Namespace: Option<String>,
429
430 pub Datacenter: Option<String>,
433
434 pub AllowStale: Option<bool>,
437
438 pub RequireConsistent: Option<bool>,
442
443 pub UseCache: Option<bool>,
447
448 pub MaxAge: Option<time::Duration>,
456
457 pub StaleIfError: Option<time::Duration>,
463
464 pub WaitIndex: Option<usize>,
467
468 pub WaitHash: Option<String>,
473
474 pub WaitTime: Option<time::Duration>,
477
478 pub Token: Option<String>,
481
482 pub Near: Option<String>,
487
488 pub NodeMeta: Option<HashMap<String, String>>,
492
493 pub RelayFactor: Option<u8>,
497
498 pub LocalOnly: Option<bool>,
501
502 pub Connect: Option<bool>,
505
506 pub Filter: Option<String>,
509}
510
511#[cfg(test)]
512mod tests {
513 use crate::api::{SERVICES_ADDRESS, Config};
514
515 #[test]
516 fn it_works() {
517 test_watch_services()
518 }
519
520 pub fn test_watch_services() {
521 use crate::api::CONSUL_CONFIG;
522 use crate::watch::WatchService;
523 use async_std::task::block_on;
524 let clone_consul = CONSUL_CONFIG.clone();
525 let mut consul = block_on(clone_consul.write());
526 let mut config = Config::default();
527 config.datacenter = Some(String::from("dc1"));
528 config.address = Some(String::from("http://127.0.0.1:8500"));
529 consul.config = Some(config);
530 let mut service = WatchService::default();
531 service.service_name = String::from("hyat_rust");
532 service.passing_only = Some(true);
533 let s = block_on(consul.get_address(&service)).unwrap();
535 println!("{:?}", s);
536 }
537}