1use std::cmp;
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Result;
12use futures::future::BoxFuture;
13use futures::stream::futures_unordered::FuturesUnordered;
14use futures::{FutureExt, StreamExt, TryFutureExt};
15use log::*;
16use serde::{Deserialize, Deserializer, Serialize};
17use tokio::select;
18use tokio::sync::watch;
19
20use crate::{Consul, WithIndex};
21
22#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
25#[serde(rename_all = "PascalCase")]
26pub struct Node {
27 pub node: String,
28 pub address: String,
29 #[serde(default, deserialize_with = "deserialize_null_default")]
30 pub meta: HashMap<String, String>,
31}
32
33#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
35#[serde(rename_all = "PascalCase")]
36pub struct Service {
37 pub service: String,
38 pub address: String,
39 pub port: u16,
40 pub tags: Vec<String>,
41 #[serde(default, deserialize_with = "deserialize_null_default")]
42 pub meta: HashMap<String, String>,
43}
44
45#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
48#[serde(rename_all = "PascalCase")]
49pub struct CatalogNode {
50 pub node: Node,
51 #[serde(default, deserialize_with = "deserialize_null_default")]
52 pub services: HashMap<String, Service>,
53}
54
55pub type ServiceList = HashMap<String, Vec<String>>;
58
59#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
62#[serde(rename_all = "PascalCase")]
63pub struct ServiceNode {
64 pub node: String,
65 pub address: String,
66 #[serde(default, deserialize_with = "deserialize_null_default")]
67 pub node_meta: HashMap<String, String>,
68 pub service_name: String,
69 pub service_tags: Vec<String>,
70 pub service_address: String,
71 pub service_port: u16,
72 #[serde(default, deserialize_with = "deserialize_null_default")]
73 pub service_meta: HashMap<String, String>,
74}
75
76#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
80#[serde(rename_all = "PascalCase")]
81pub struct HealthServiceNode {
82 pub node: Node,
83 pub service: Service,
84 pub checks: Vec<HealthCheck>,
85}
86
87#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
89#[serde(rename_all = "PascalCase")]
90pub struct HealthCheck {
91 pub node: String,
92 #[serde(rename = "CheckID")]
93 pub check_id: String,
94 pub name: String,
95 pub status: String,
96 pub output: String,
97 #[serde(rename = "Type")]
98 pub type_: String,
99}
100
101pub type AllServiceHealth = HashMap<String, Arc<[HealthServiceNode]>>;
104
105impl Consul {
106 pub async fn catalog_node_list(
110 &self,
111 last_index: Option<usize>,
112 ) -> Result<WithIndex<Vec<Node>>> {
113 self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index)
114 .await
115 }
116
117 pub async fn catalog_node(
121 &self,
122 host: &str,
123 last_index: Option<usize>,
124 ) -> Result<WithIndex<Option<CatalogNode>>> {
125 self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index)
126 .await
127 }
128
129 pub async fn catalog_service_list(
133 &self,
134 last_index: Option<usize>,
135 ) -> Result<WithIndex<ServiceList>> {
136 self.get_with_index::<ServiceList>(format!("{}/v1/catalog/services", self.url), last_index)
137 .await
138 }
139
140 pub async fn catalog_service_nodes(
144 &self,
145 service: &str,
146 last_index: Option<usize>,
147 ) -> Result<WithIndex<Vec<ServiceNode>>> {
148 self.get_with_index(
149 format!("{}/v1/catalog/service/{}", self.url, service),
150 last_index,
151 )
152 .await
153 }
154
155 pub async fn health_service_instances(
159 &self,
160 service: &str,
161 last_index: Option<usize>,
162 ) -> Result<WithIndex<Vec<HealthServiceNode>>> {
163 self.get_with_index(
164 format!("{}/v1/health/service/{}", self.url, service),
165 last_index,
166 )
167 .await
168 }
169
170 pub fn watch_all_service_health(
174 &self,
175 max_retry_interval: Duration,
176 ) -> watch::Receiver<AllServiceHealth> {
177 let (tx, rx) = watch::channel(HashMap::new());
178
179 tokio::spawn(do_watch_all_service_health(
180 self.clone(),
181 tx,
182 max_retry_interval,
183 ));
184
185 rx
186 }
187}
188
189async fn do_watch_all_service_health(
190 consul: Consul,
191 tx: watch::Sender<AllServiceHealth>,
192 max_retry_interval: Duration,
193) {
194 let mut services = AllServiceHealth::new();
195 let mut service_watchers =
196 FuturesUnordered::<BoxFuture<(String, std::result::Result<_, (usize, _)>)>>::new();
197 let mut service_list: BoxFuture<std::result::Result<_, (usize, _)>> =
198 Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e)));
199
200 loop {
201 select! {
202 list_res = &mut service_list => {
203 match list_res {
204 Ok(list) => {
205 let list_index = list.index();
206 for service in list.into_inner().keys() {
207 if !services.contains_key(service) {
208 services.insert(service.to_string(), Arc::new([]));
209
210 let service = service.to_string();
211 service_watchers.push(Box::pin(async {
212 let res = consul.health_service_instances(&service, None).await
213 .map_err(|e| (1, e));
214 (service, res)
215 }));
216 }
217 }
218 service_list = Box::pin(consul.catalog_service_list(Some(list_index)).map_err(|e| (1, e)));
219 }
220 Err((err_count, e)) => {
221 warn!("Error listing services: {} ({} consecutive errors)", e, err_count);
222 let consul = &consul;
223 service_list = Box::pin(async move {
224 tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
225 consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e))
226 });
227 }
228 }
229 }
230 (service, watch_res) = service_watchers.next().then(some_or_pending) => {
231 match watch_res {
232 Ok(nodes) => {
233 let index = nodes.index();
234
235 let nodes = nodes.into_inner();
236 if services.get(&service).as_ref().map(|n| &n[..]) != Some(&nodes[..]) {
237 services.insert(service.clone(), nodes.into());
238 if tx.send(services.clone()).is_err() {
239 break;
240 }
241 }
242
243 let consul = &consul;
244 service_watchers.push(Box::pin(async move {
245 let res = consul.health_service_instances(&service, Some(index)).await
246 .map_err(|e| (1, e));
247 (service, res)
248 }));
249 }
250 Err((err_count, e)) => {
251 warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count);
252 let consul = &consul;
253 service_watchers.push(Box::pin(async move {
254 tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
255 let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e));
256 (service, res)
257 }));
258 }
259 }
260 }
261 _ = tx.closed() => {
262 break;
263 }
264 }
265 }
266}
267
268async fn some_or_pending<T>(value: Option<T>) -> T {
269 match value {
270 Some(v) => v,
271 None => futures::future::pending().await,
272 }
273}
274
275fn retry_to_time(retries: usize, max_time: Duration) -> Duration {
276 Duration::from_secs_f64(
279 max_time.as_secs_f64().min(2.0f64 * 1.5f64.powf(retries as f64))
280 )
281}
282
283fn deserialize_null_default<'de, D, T>(deserializer: D) -> Result<T, D::Error>
284where
285 T: Default + Deserialize<'de>,
286 D: Deserializer<'de>,
287{
288 Option::deserialize(deserializer).map(Option::unwrap_or_default)
289}