df_consul/
catalog.rs

1//! Contains structures to interact with the catalog API
2//!
3//! See <https://developer.hashicorp.com/consul/api-docs/catalog>
4//! for the full definition of the API.
5
6use std::cmp;
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Result;
12use futures::future::BoxFuture;
13use futures::stream::futures_unordered::FuturesUnordered;
14use futures::{FutureExt, StreamExt, TryFutureExt};
15use log::*;
16use serde::{Deserialize, Deserializer, Serialize};
17use tokio::select;
18use tokio::sync::watch;
19
20use crate::{Consul, WithIndex};
21
22/// Node summary, as specified in response to "list nodes" API calls in
23/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes>
24#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
25#[serde(rename_all = "PascalCase")]
26pub struct Node {
27    pub node: String,
28    pub address: String,
29    #[serde(default, deserialize_with = "deserialize_null_default")]
30    pub meta: HashMap<String, String>,
31}
32
33/// One of the services returned in a CatalogNode
34#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
35#[serde(rename_all = "PascalCase")]
36pub struct Service {
37    pub service: String,
38    pub address: String,
39    pub port: u16,
40    pub tags: Vec<String>,
41    #[serde(default, deserialize_with = "deserialize_null_default")]
42    pub meta: HashMap<String, String>,
43}
44
45/// Full node info, as specified in response to "retrieve map of services for a node" API call in
46/// <https://developer.hashicorp.com/consul/api-docs/catalog#retrieve-map-of-services-for-a-node>
47#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
48#[serde(rename_all = "PascalCase")]
49pub struct CatalogNode {
50    pub node: Node,
51    #[serde(default, deserialize_with = "deserialize_null_default")]
52    pub services: HashMap<String, Service>,
53}
54
55/// Concise service list, as specified in response to "list services" API call in
56/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-services>
57pub type ServiceList = HashMap<String, Vec<String>>;
58
59/// Node serving a service, as specified in response to "list nodes for a service" API call in
60/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service>
61#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
62#[serde(rename_all = "PascalCase")]
63pub struct ServiceNode {
64    pub node: String,
65    pub address: String,
66    #[serde(default, deserialize_with = "deserialize_null_default")]
67    pub node_meta: HashMap<String, String>,
68    pub service_name: String,
69    pub service_tags: Vec<String>,
70    pub service_address: String,
71    pub service_port: u16,
72    #[serde(default, deserialize_with = "deserialize_null_default")]
73    pub service_meta: HashMap<String, String>,
74}
75
76/// Node serving a service with health info,
77/// as specified in response to "list service instances for a service" health API call in
78/// <https://developer.hashicorp.com/consul/api-docs/health#list-service-instances-for-service>
79#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
80#[serde(rename_all = "PascalCase")]
81pub struct HealthServiceNode {
82    pub node: Node,
83    pub service: Service,
84    pub checks: Vec<HealthCheck>,
85}
86
87/// A health check as returned in HealthServiceNode
88#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
89#[serde(rename_all = "PascalCase")]
90pub struct HealthCheck {
91    pub node: String,
92    #[serde(rename = "CheckID")]
93    pub check_id: String,
94    pub name: String,
95    pub status: String,
96    pub output: String,
97    #[serde(rename = "Type")]
98    pub type_: String,
99}
100
101/// Map containing all services and their associated nodes, with health checks,
102/// returned by `watch_all_service_health`
103pub type AllServiceHealth = HashMap<String, Arc<[HealthServiceNode]>>;
104
105impl Consul {
106    /// The "list nodes" API call of the Catalog API
107    ///
108    /// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes>
109    pub async fn catalog_node_list(
110        &self,
111        last_index: Option<usize>,
112    ) -> Result<WithIndex<Vec<Node>>> {
113        self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index)
114            .await
115    }
116
117    /// The "retrieve map of services for a node" API call of the Catalog API
118    ///
119    /// <https://developer.hashicorp.com/consul/api-docs/catalog#retrieve-map-of-services-for-a-node>
120    pub async fn catalog_node(
121        &self,
122        host: &str,
123        last_index: Option<usize>,
124    ) -> Result<WithIndex<Option<CatalogNode>>> {
125        self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index)
126            .await
127    }
128
129    /// The "list services" API call of the Catalog api
130    ///
131    /// <https://developer.hashicorp.com/consul/api-docs/catalog#list-services>
132    pub async fn catalog_service_list(
133        &self,
134        last_index: Option<usize>,
135    ) -> Result<WithIndex<ServiceList>> {
136        self.get_with_index::<ServiceList>(format!("{}/v1/catalog/services", self.url), last_index)
137            .await
138    }
139
140    /// The "list nodes for a service" API call of the Catalog api
141    ///
142    /// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service>
143    pub async fn catalog_service_nodes(
144        &self,
145        service: &str,
146        last_index: Option<usize>,
147    ) -> Result<WithIndex<Vec<ServiceNode>>> {
148        self.get_with_index(
149            format!("{}/v1/catalog/service/{}", self.url, service),
150            last_index,
151        )
152        .await
153    }
154
155    /// The "list service instances for a service" API call of the Health api
156    ///
157    /// <https://developer.hashicorp.com/consul/api-docs/health#list-service-instances-for-service>
158    pub async fn health_service_instances(
159        &self,
160        service: &str,
161        last_index: Option<usize>,
162    ) -> Result<WithIndex<Vec<HealthServiceNode>>> {
163        self.get_with_index(
164            format!("{}/v1/health/service/{}", self.url, service),
165            last_index,
166        )
167        .await
168    }
169
170    /// Launches a background task that watches all services and the nodes that serve them,
171    /// and make that info available in a tokio watch channel.
172    /// The worker terminates when the channel is dropped.
173    pub fn watch_all_service_health(
174        &self,
175        max_retry_interval: Duration,
176    ) -> watch::Receiver<AllServiceHealth> {
177        let (tx, rx) = watch::channel(HashMap::new());
178
179        tokio::spawn(do_watch_all_service_health(
180            self.clone(),
181            tx,
182            max_retry_interval,
183        ));
184
185        rx
186    }
187}
188
189async fn do_watch_all_service_health(
190    consul: Consul,
191    tx: watch::Sender<AllServiceHealth>,
192    max_retry_interval: Duration,
193) {
194    let mut services = AllServiceHealth::new();
195    let mut service_watchers =
196        FuturesUnordered::<BoxFuture<(String, std::result::Result<_, (usize, _)>)>>::new();
197    let mut service_list: BoxFuture<std::result::Result<_, (usize, _)>> =
198        Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e)));
199
200    loop {
201        select! {
202            list_res = &mut service_list => {
203                match list_res {
204                    Ok(list) => {
205                        let list_index = list.index();
206                        for service in list.into_inner().keys() {
207                            if !services.contains_key(service) {
208                                services.insert(service.to_string(), Arc::new([]));
209
210                                let service = service.to_string();
211                                service_watchers.push(Box::pin(async {
212                                    let res = consul.health_service_instances(&service, None).await
213                                        .map_err(|e| (1, e));
214                                    (service, res)
215                                }));
216                            }
217                        }
218                        service_list = Box::pin(consul.catalog_service_list(Some(list_index)).map_err(|e| (1, e)));
219                    }
220                    Err((err_count, e)) => {
221                        warn!("Error listing services: {} ({} consecutive errors)", e, err_count);
222                        let consul = &consul;
223                        service_list = Box::pin(async move {
224                            tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
225                            consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e))
226                        });
227                    }
228                }
229            }
230            (service, watch_res) = service_watchers.next().then(some_or_pending) => {
231                match watch_res {
232                    Ok(nodes) => {
233                        let index = nodes.index();
234
235                        let nodes =  nodes.into_inner();
236                        if services.get(&service).as_ref().map(|n| &n[..]) != Some(&nodes[..]) {
237                            services.insert(service.clone(), nodes.into());
238                            if tx.send(services.clone()).is_err() {
239                                break;
240                            }
241                        }
242
243                        let consul = &consul;
244                        service_watchers.push(Box::pin(async move {
245                            let res = consul.health_service_instances(&service, Some(index)).await
246                                .map_err(|e| (1, e));
247                            (service, res)
248                        }));
249                    }
250                    Err((err_count, e)) => {
251                        warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count);
252                        let consul = &consul;
253                        service_watchers.push(Box::pin(async move {
254                            tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
255                            let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e));
256                            (service, res)
257                        }));
258                    }
259                }
260            }
261            _ = tx.closed() => {
262                break;
263            }
264        }
265    }
266}
267
268async fn some_or_pending<T>(value: Option<T>) -> T {
269    match value {
270        Some(v) => v,
271        None => futures::future::pending().await,
272    }
273}
274
275fn retry_to_time(retries: usize, max_time: Duration) -> Duration {
276    // Exponential retry interval, starting at 2 seconds, maxing out at max_time,
277    // with exponential increase of *1.5 each time
278    Duration::from_secs_f64(
279        max_time.as_secs_f64().min(2.0f64 * 1.5f64.powf(retries as f64))
280    )
281}
282
283fn deserialize_null_default<'de, D, T>(deserializer: D) -> Result<T, D::Error>
284where
285    T: Default + Deserialize<'de>,
286    D: Deserializer<'de>,
287{
288    Option::deserialize(deserializer).map(Option::unwrap_or_default)
289}