Skip to main content

consul_rs/
api.rs

1use super::agent::{AgentServiceRegistration, ServiceRegisterOpts};
2use super::health::{ServiceAddress, ServiceEntry};
3use super::watch::WatchService;
4use async_std::fs::read_to_string;
5use async_std::sync::{Arc, RwLock};
6use lazy_static::lazy_static;
7use rand::Rng;
8use serde_derive::{Deserialize, Serialize};
9use serde_yaml;
10use std::collections::{HashMap, LinkedList};
11use std::time;
12use surf;
13use surf::http::Method;
14use surf::{Error, StatusCode};
15use toml;
16
17lazy_static! {
18    pub static ref CONSUL_CONFIG: Arc<RwLock<ConsulConfig>> = {
19        let consul_config = ConsulConfig::default();
20        let consul_config = RwLock::new(consul_config);
21        Arc::new(consul_config)
22    };
23    pub static ref SERVICES_ADDRESS: Arc<RwLock<HashMap<String, ServiceAddress>>> = {
24        let hash_map = HashMap::new();
25        let hash_map = RwLock::new(hash_map);
26        Arc::new(hash_map)
27    };
28}
29
30#[derive(Debug, Serialize, Deserialize, Clone)]
31pub struct ConsulConfig {
32    pub config: Option<Config>,
33    pub watch_services: Option<Vec<WatchService>>,
34}
35
36impl Default for ConsulConfig {
37    fn default() -> Self {
38        let mut config = Config::default();
39        config.address = Some(String::from("http://127.0.0.1:8500"));
40        config.datacenter = Some(String::from("dc1"));
41        ConsulConfig {
42            config: Some(config),
43            watch_services: None,
44        }
45    }
46}
47
48impl ConsulConfig {
49    pub async fn load_config(path: &str) -> surf::Result<()> {
50        let content = read_to_string(path).await?;
51        let mut config = ConsulConfig::default();
52
53        if path.ends_with(".yml") || path.ends_with(".yaml") {
54            config = serde_yaml::from_str(&content)?;
55        } else if path.ends_with(".toml") {
56            config = toml::from_str(&content)?;
57        }
58
59        let consul_config = CONSUL_CONFIG.clone();
60        let mut consul_config = consul_config.write().await;
61        consul_config.config = config.config;
62        consul_config.watch_services = config.watch_services;
63        Ok(())
64    }
65
66    pub async fn new_request(&self, method: Method, path: &str) -> surf::Result<surf::Request> {
67        let config = self.config.as_ref().expect("consul config is empty");
68        let address = config
69            .address
70            .as_ref()
71            .expect("consul config address is empty");
72        let url = format!("{}{}", address, path);
73        let uri = surf::Url::parse(&url)?;
74        let mut req = surf::Request::new(method, uri);
75        req.set_header("Connection", "close");
76        let mut body: HashMap<String, String> = HashMap::new();
77
78        if config.datacenter.is_some() {
79            body.insert(
80                String::from("dc"),
81                String::from(config.datacenter.as_ref().unwrap()),
82            );
83        };
84        if config.namespace.is_some() {
85            body.insert(
86                String::from("ns"),
87                String::from(config.namespace.as_ref().unwrap()),
88            );
89        };
90
91        if config.wait_time.is_some() {
92            let wait = config.wait_time.as_ref().unwrap().to_string();
93            body.insert(String::from("wait"), wait);
94        } else {
95            body.insert(String::from("wait"), String::from("5s"));
96        }
97
98        if config.token.is_some() {
99            body.insert(
100                "X-Consul-Token".to_string(),
101                String::from(config.token.as_ref().unwrap()),
102            );
103        };
104
105        req.body_json(&body)?;
106        Ok(req)
107    }
108
109    /// service_register is used to register a new service with
110    /// the local agent
111    ///
112    /// ```
113    /// use consul_rs::api::CONSUL_CONFIG;
114    /// use async_std::task::block_on;
115    /// use consul_rs::agent::AgentServiceRegistration;
116    /// let clone_consul = CONSUL_CONFIG.clone();
117    /// let consul = block_on(clone_consul.read());
118    /// let mut service = AgentServiceRegistration::default();
119    /// service.ID = Some(String::from("321"));
120    /// service.Name = Some(String::from("test"));
121    /// service.Port = Some(8080);
122    /// service.Address = Some(String::from("127.0.0.1"));
123    /// let s = block_on(consul.service_register(&service)).unwrap();
124    /// println!("{}", s);
125    /// ```
126    pub async fn service_register(
127        &self,
128        service: &AgentServiceRegistration,
129    ) -> surf::Result<StatusCode> {
130        let opts = ServiceRegisterOpts::default();
131        let status = self.service_register_self(service, &opts).await?;
132        Ok(status)
133    }
134
135    pub async fn service_register_opts(
136        &self,
137        service: &AgentServiceRegistration,
138        opts: &ServiceRegisterOpts,
139    ) -> surf::Result<StatusCode> {
140        let status = self.service_register_self(service, opts).await?;
141        Ok(status)
142    }
143
144    pub async fn service_register_self(
145        &self,
146        service: &AgentServiceRegistration,
147        opts: &ServiceRegisterOpts,
148    ) -> surf::Result<StatusCode> {
149        if self.config.is_some() {
150            let mut req = self
151                .new_request(Method::Put, "/v1/agent/service/register")
152                .await?;
153            if opts.ReplaceExistingChecks == true {
154                req.set_query(&opts)?;
155            };
156            req.body_json(&service)?;
157            let client = surf::Client::new();
158            let res = client.send(req).await?;
159            Ok(res.status())
160        } else {
161            Err(Error::from_str(StatusCode::BadRequest, "client init err"))
162        }
163    }
164
165    /// service_deregister is used to register a new service with
166    /// the local agent
167    ///
168    /// ```
169    /// use consul_rs::api::CONSUL_CONFIG;
170    /// use async_std::task::block_on;
171    /// use consul_rs::ConsulTrait;
172    /// use consul_rs::agent::AgentServiceRegistration;
173    /// let clone_consul = CONSUL_CONFIG.clone();
174    /// let consul = block_on(clone_consul.read());
175    /// let service_id = String::from("321");
176    /// let s = block_on(consul.service_deregister(service_id)).unwrap();
177    /// println!("{}", s);
178    /// ```
179    pub async fn service_deregister(&self, service_id: String) -> surf::Result<StatusCode> {
180        if self.config.is_some() {
181            let uri = format!("/v1/agent/service/deregister/{}", service_id);
182            let req = self.new_request(Method::Put, &uri).await?;
183            let client = surf::Client::new();
184            let res = client.send(req).await?;
185            Ok(res.status())
186        } else {
187            Err(Error::from_str(StatusCode::BadRequest, "client init err"))
188        }
189    }
190
191    pub async fn watch_services() -> surf::Result<()> {
192        let config =  CONSUL_CONFIG.clone();
193        let config = config.read().await;
194        if config.watch_services.is_some() {
195            loop {
196                let watch_services = config.watch_services.as_ref().unwrap();
197                let mut service_await = vec![];
198
199                for watch_service in watch_services.iter() {
200                    service_await.push(config.get_address(watch_service))
201                }
202                let mut vv = HashMap::new();
203                for v in service_await.into_iter() {
204                    let (key, service_address) = v.await?;
205                    if key != "" {
206                        vv.insert(key, service_address);
207                    }
208                }
209                if vv.len() != 0 {
210                    let services_addresses = SERVICES_ADDRESS.clone();
211                    let mut services_addresses = services_addresses.write().await;
212                    for (key, service_address) in vv.iter() {
213                        services_addresses.insert(key.to_string(), service_address.to_owned());
214                    }
215                }
216            }
217        }
218        Ok(())
219    }
220
221    async fn health_service(
222        &self,
223        watch_service: &WatchService,
224    ) -> surf::Result<(u64, Vec<ServiceEntry>)> {
225        let path = format!("/v1/health/service/{}", watch_service.service_name);
226        if self.config.is_some() {
227            let mut req = self.new_request(Method::Get, &path).await?;
228            let mut query: HashMap<&str, String> = HashMap::new();
229            let default = String::new();
230            let tag = watch_service.tag.as_ref().unwrap_or(&default);
231            if tag != "" {
232                query.insert("tag", tag.to_string());
233            }
234            let services_addresses = SERVICES_ADDRESS.clone();
235            let services_addresses = services_addresses.read().await;
236            let key = format!("{}{}", watch_service.service_name, tag);
237            let service_address = services_addresses.get(&key);
238            let mut index = 0;
239            if service_address.is_some() {
240                let service_address = service_address.unwrap();
241                index = service_address.index;
242            }
243            query.insert("index", index.to_string());
244
245            if watch_service.passing_only.is_some() {
246                let passing = watch_service.passing_only.unwrap();
247                if passing {
248                    let config = self.config.as_ref().unwrap();
249                    let wait;
250                    if config.wait_time.is_some() {
251                        wait = config.wait_time.as_ref().unwrap().to_string();
252                    } else {
253                        wait = String::from("5s")
254                    }
255                    query.insert("passing", "1".to_string());
256                    query.insert("wait", wait);
257                }
258            };
259            req.set_query(&query)?;
260            let uri = req.url().to_string();
261            log::debug!("{}", uri);
262            let client = surf::Client::new();
263            let mut res = client.send(req).await?;
264            let out: Vec<ServiceEntry> = res.body_json().await?;
265            Ok((index, out))
266        } else {
267            Err(Error::from_str(StatusCode::BadRequest, "client init err"))
268        }
269    }
270
271    async fn get_address(
272        &self,
273        watch_service: &WatchService,
274    ) -> surf::Result<(String, ServiceAddress)> {
275        let (cur_index, entry) = self.health_service(watch_service).await?;
276        let mut service_addresses = vec![];
277        let mut service_addresses_link = LinkedList::new();
278        let mut index = 0;
279        for val in entry.iter() {
280            if val.Service.is_some() {
281                let v = val.Service.as_ref().unwrap();
282                if v.Address.is_some() && v.Port.is_some() {
283                    index = v.ModifyIndex.unwrap();
284                    if index == cur_index {
285                        continue;
286                    };
287                    let address = v.Address.as_ref().unwrap();
288                    let port = v.Port.as_ref().unwrap();
289                    let address = format!("{}:{}", address, port);
290                    service_addresses.push(address.to_owned());
291                    service_addresses_link.push_back(address);
292                };
293            };
294        };
295        if service_addresses.len() == 0 {
296            return Ok((String::new(), ServiceAddress::default()));
297        };
298        let mut tag = "";
299        if watch_service.tag.is_some() {
300            tag = watch_service.tag.as_ref().unwrap();
301        };
302        let key = format!("{}{}", watch_service.service_name, tag);
303        let service_addresses = ServiceAddress {
304            index,
305            address: service_addresses,
306            address_link: service_addresses_link,
307        };
308
309        Ok((key, service_addresses))
310    }
311
312    pub async fn random_policy(&self, service_name: &str, tag: &str) -> surf::Result<String> {
313        let key = format!("{}{}", service_name, tag);
314        let services_addresses = SERVICES_ADDRESS.clone();
315        let services_addresses = services_addresses.read().await;
316        let service_addresses = services_addresses.get(&key);
317        if service_addresses.is_some() {
318            let service_addresses = service_addresses.unwrap();
319            let range = service_addresses.address.len();
320            if range == 0 {
321                return Err(Error::from_str(
322                    StatusCode::BadRequest,
323                    "consul server address is empty",
324                ));
325            };
326            let mut r = rand::thread_rng();
327            let idx: usize = r.gen_range(0..range);
328            let address = service_addresses.address.get(idx).unwrap();
329            return Ok(String::from(address));
330        }
331        Err(Error::from_str(
332            StatusCode::BadRequest,
333            "consul server address is empty",
334        ))
335    }
336}
337
338/// Config is used to configure the creation of a client
339#[derive(Default, Debug, Clone, Serialize, Deserialize)]
340#[allow(non_snake_case)]
341pub struct Config {
342    /// Address is the address of the Consul server
343    pub address: Option<String>,
344
345    /// Scheme is the URI scheme for the Consul server
346    pub scheme: Option<String>,
347
348    /// Datacenter to use. If not provided, the default agent datacenter is used.
349    pub datacenter: Option<String>,
350
351    /// Transport is the Transport to use for the http client.
352    /// pub Transport: surf::Client,
353    /// HttpClient is the client to use. Default will be
354    /// used if not provided.
355    /// pub HttpClient: Option<surf::Client>,
356
357    /// HttpAuth is the auth info to use for http access.
358
359    /// pub HttpAuth: Option<http_types::auth::BasicAuth>,
360
361    /// WaitTime limits how long a Watch will block. If not provided,
362    /// the agent default values will be used.
363    pub wait_time: Option<String>,
364
365    /// Token is used to provide a per-request ACL token
366    /// which overrides the agent's default token.
367    pub token: Option<String>,
368
369    /// TokenFile is a file containing the current token to use for this client.
370    /// If provided it is read once at startup and never again.
371    pub token_file: Option<String>,
372
373    /// Namespace is the name of the namespace to send along for the request
374    /// when no other Namespace is present in the QueryOptions
375    pub namespace: Option<String>,
376
377    pub tls_config: Option<TLSConfig>,
378}
379
380/// TLSConfig is used to generate a TLSClientConfig that's useful for talking to
381/// Consul using TLS.
382#[derive(Default, Debug, Clone, Serialize, Deserialize)]
383#[allow(non_snake_case)]
384pub struct TLSConfig {
385    /// Address is the optional address of the Consul server. The port, if any
386    /// will be removed from here and this will be set to the ServerName of the
387    /// resulting config.
388    pub address: Option<String>,
389
390    /// CAFile is the optional path to the CA certificate used for Consul
391    /// communication, defaults to the system bundle if not specified.
392    pub ca_file: Option<String>,
393
394    /// CAPath is the optional path to a directory of CA certificates to use for
395    /// Consul communication, defaults to the system bundle if not specified.
396    pub ca_path: Option<String>,
397
398    /// CAPem is the optional PEM-encoded CA certificate used for Consul
399    /// communication, defaults to the system bundle if not specified.
400    pub ca_pem: Option<String>,
401
402    /// CertFile is the optional path to the certificate for Consul
403    /// communication. If this is set then you need to also set KeyFile.
404    pub cert_file: Option<String>,
405
406    /// CertPEM is the optional PEM-encoded certificate for Consul
407    /// communication. If this is set then you need to also set KeyPEM.
408    pub cert_pem: Option<String>,
409
410    /// KeyFile is the optional path to the private key for Consul communication.
411    /// If this is set then you need to also set CertFile.
412    pub key_file: Option<String>,
413
414    /// KeyPEM is the optional PEM-encoded private key for Consul communication.
415    /// If this is set then you need to also set CertPEM.
416    pub key_pem: Option<String>,
417
418    /// InsecureSkipVerify if set to true will disable TLS host verification.
419    pub insecure_skip_verify: Option<bool>,
420}
421
422/// QueryOptions are used to parameterize a query
423#[derive(Default, Debug, Clone, Serialize, Deserialize)]
424#[allow(non_snake_case)]
425pub struct QueryOptions {
426    /// Namespace overrides the `default` namespace
427    /// Note: Namespaces are available only in Consul Enterprise
428    pub Namespace: Option<String>,
429
430    /// Providing a datacenter overwrites the DC provided
431    /// by the Config
432    pub Datacenter: Option<String>,
433
434    /// AllowStale allows any Consul server (non-leader) to service
435    /// a read. This allows for lower latency and higher throughput
436    pub AllowStale: Option<bool>,
437
438    /// RequireConsistent forces the read to be fully consistent.
439    /// This is more expensive but prevents ever performing a stale
440    /// read.
441    pub RequireConsistent: Option<bool>,
442
443    /// UseCache requests that the agent cache results locally. See
444    /// https:///www.consul.io/api/features/caching.html for more details on the
445    /// semantics.
446    pub UseCache: Option<bool>,
447
448    /// MaxAge limits how old a cached value will be returned if UseCache is true.
449    /// If there is a cached response that is older than the MaxAge, it is treated
450    /// as a cache miss and a new fetch invoked. If the fetch fails, the error is
451    /// returned. Clients that wish to allow for stale results on error can set
452    /// StaleIfError to a longer duration to change this behavior. It is ignored
453    /// if the endpoint supports background refresh caching. See
454    /// https:///www.consul.io/api/features/caching.html for more details.
455    pub MaxAge: Option<time::Duration>,
456
457    /// StaleIfError specifies how stale the client will accept a cached response
458    /// if the servers are unavailable to fetch a fresh one. Only makes sense when
459    /// UseCache is true and MaxAge is set to a lower, non-zero value. It is
460    /// ignored if the endpoint supports background refresh caching. See
461    /// https:///www.consul.io/api/features/caching.html for more details.
462    pub StaleIfError: Option<time::Duration>,
463
464    /// WaitIndex is used to enable a blocking query. Waits
465    /// until the timeout or the next index is reached
466    pub WaitIndex: Option<usize>,
467
468    /// WaitHash is used by some endpoints instead of WaitIndex to perform blocking
469    /// on state based on a hash of the response rather than a monotonic index.
470    /// This is required when the state being blocked on is not stored in Raft, for
471    /// example agent-local proxy configuration.
472    pub WaitHash: Option<String>,
473
474    /// WaitTime is used to bound the duration of a wait.
475    /// Defaults to that of the Config, but can be overridden.
476    pub WaitTime: Option<time::Duration>,
477
478    /// Token is used to provide a per-request ACL token
479    /// which overrides the agent's default token.
480    pub Token: Option<String>,
481
482    /// Near is used to provide a node name that will sort the results
483    /// in ascending order based on the estimated round trip time from
484    /// that node. Setting this to "_agent" will use the agent's node
485    /// for the sort.
486    pub Near: Option<String>,
487
488    /// NodeMeta is used to filter results by nodes with the given
489    /// metadata key/value pairs. Currently, only one key/value pair can
490    /// be provided for filtering.
491    pub NodeMeta: Option<HashMap<String, String>>,
492
493    /// RelayFactor is used in keyring operations to cause responses to be
494    /// relayed back to the sender through N other random nodes. Must be
495    /// a value from 0 to 5 (inclusive).
496    pub RelayFactor: Option<u8>,
497
498    /// LocalOnly is used in keyring list operation to force the keyring
499    /// query to only hit local servers (no WAN traffic).
500    pub LocalOnly: Option<bool>,
501
502    /// Connect filters prepared query execution to only include Connect-capable
503    /// services. This currently affects prepared query execution.
504    pub Connect: Option<bool>,
505
506    /// Filter requests filtering data prior to it being returned. The string
507    /// is a go-bexpr compatible expression.
508    pub Filter: Option<String>,
509}
510
511#[cfg(test)]
512mod tests {
513    use crate::api::{SERVICES_ADDRESS, Config};
514
515    #[test]
516    fn it_works() {
517        test_watch_services()
518    }
519
520    pub fn test_watch_services() {
521        use crate::api::CONSUL_CONFIG;
522        use crate::watch::WatchService;
523        use async_std::task::block_on;
524        let clone_consul = CONSUL_CONFIG.clone();
525        let mut consul = block_on(clone_consul.write());
526        let mut config = Config::default();
527        config.datacenter = Some(String::from("dc1"));
528        config.address = Some(String::from("http://127.0.0.1:8500"));
529        consul.config = Some(config);
530        let mut service = WatchService::default();
531        service.service_name = String::from("hyat_rust");
532        service.passing_only = Some(true);
533        // consul.watch_services = Some(vec![service]);
534        let s = block_on(consul.get_address(&service)).unwrap();
535        println!("{:?}", s);
536    }
537}