use std::cmp;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use futures::future::BoxFuture;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::{FutureExt, StreamExt, TryFutureExt};
use log::*;
use serde::{Deserialize, Deserializer, Serialize};
use tokio::select;
use tokio::sync::watch;
use crate::{Consul, WithIndex};
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct Node {
pub node: String,
pub address: String,
#[serde(default, deserialize_with = "deserialize_null_default")]
pub meta: HashMap<String, String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct Service {
pub service: String,
pub address: String,
pub port: u16,
pub tags: Vec<String>,
#[serde(default, deserialize_with = "deserialize_null_default")]
pub meta: HashMap<String, String>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct CatalogNode {
pub node: Node,
#[serde(default, deserialize_with = "deserialize_null_default")]
pub services: HashMap<String, Service>,
}
pub type ServiceList = HashMap<String, Vec<String>>;
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct ServiceNode {
pub node: String,
pub address: String,
#[serde(default, deserialize_with = "deserialize_null_default")]
pub node_meta: HashMap<String, String>,
pub service_name: String,
pub service_tags: Vec<String>,
pub service_address: String,
pub service_port: u16,
#[serde(default, deserialize_with = "deserialize_null_default")]
pub service_meta: HashMap<String, String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct HealthServiceNode {
pub node: Node,
pub service: Service,
pub checks: Vec<HealthCheck>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct HealthCheck {
pub node: String,
#[serde(rename = "CheckID")]
pub check_id: String,
pub name: String,
pub status: String,
pub output: String,
#[serde(rename = "Type")]
pub type_: String,
}
pub type AllServiceHealth = HashMap<String, Arc<[HealthServiceNode]>>;
impl Consul {
pub async fn catalog_node_list(
&self,
last_index: Option<usize>,
) -> Result<WithIndex<Vec<Node>>> {
self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index)
.await
}
pub async fn catalog_node(
&self,
host: &str,
last_index: Option<usize>,
) -> Result<WithIndex<Option<CatalogNode>>> {
self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index)
.await
}
pub async fn catalog_service_list(
&self,
last_index: Option<usize>,
) -> Result<WithIndex<ServiceList>> {
self.get_with_index::<ServiceList>(format!("{}/v1/catalog/services", self.url), last_index)
.await
}
pub async fn catalog_service_nodes(
&self,
service: &str,
last_index: Option<usize>,
) -> Result<WithIndex<Vec<ServiceNode>>> {
self.get_with_index(
format!("{}/v1/catalog/service/{}", self.url, service),
last_index,
)
.await
}
pub async fn health_service_instances(
&self,
service: &str,
last_index: Option<usize>,
) -> Result<WithIndex<Vec<HealthServiceNode>>> {
self.get_with_index(
format!("{}/v1/health/service/{}", self.url, service),
last_index,
)
.await
}
pub fn watch_all_service_health(
&self,
max_retry_interval: Duration,
) -> watch::Receiver<AllServiceHealth> {
let (tx, rx) = watch::channel(HashMap::new());
tokio::spawn(do_watch_all_service_health(
self.clone(),
tx,
max_retry_interval,
));
rx
}
}
async fn do_watch_all_service_health(
consul: Consul,
tx: watch::Sender<AllServiceHealth>,
max_retry_interval: Duration,
) {
let mut services = AllServiceHealth::new();
let mut service_watchers =
FuturesUnordered::<BoxFuture<(String, std::result::Result<_, (usize, _)>)>>::new();
let mut service_list: BoxFuture<std::result::Result<_, (usize, _)>> =
Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e)));
loop {
select! {
list_res = &mut service_list => {
match list_res {
Ok(list) => {
let list_index = list.index();
for service in list.into_inner().keys() {
if !services.contains_key(service) {
services.insert(service.to_string(), Arc::new([]));
let service = service.to_string();
service_watchers.push(Box::pin(async {
let res = consul.health_service_instances(&service, None).await
.map_err(|e| (1, e));
(service, res)
}));
}
}
service_list = Box::pin(consul.catalog_service_list(Some(list_index)).map_err(|e| (1, e)));
}
Err((err_count, e)) => {
warn!("Error listing services: {} ({} consecutive errors)", e, err_count);
let consul = &consul;
service_list = Box::pin(async move {
tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e))
});
}
}
}
(service, watch_res) = service_watchers.next().then(some_or_pending) => {
match watch_res {
Ok(nodes) => {
let index = nodes.index();
let nodes = nodes.into_inner();
if services.get(&service).as_ref().map(|n| &n[..]) != Some(&nodes[..]) {
services.insert(service.clone(), nodes.into());
if tx.send(services.clone()).is_err() {
break;
}
}
let consul = &consul;
service_watchers.push(Box::pin(async move {
let res = consul.health_service_instances(&service, Some(index)).await
.map_err(|e| (1, e));
(service, res)
}));
}
Err((err_count, e)) => {
warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count);
let consul = &consul;
service_watchers.push(Box::pin(async move {
tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e));
(service, res)
}));
}
}
}
_ = tx.closed() => {
break;
}
}
}
}
async fn some_or_pending<T>(value: Option<T>) -> T {
match value {
Some(v) => v,
None => futures::future::pending().await,
}
}
fn retry_to_time(retries: usize, max_time: Duration) -> Duration {
Duration::from_secs_f64(
max_time.as_secs_f64().min(2.0f64 * 1.5f64.powf(retries as f64))
)
}
fn deserialize_null_default<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
T: Default + Deserialize<'de>,
D: Deserializer<'de>,
{
Option::deserialize(deserializer).map(Option::unwrap_or_default)
}