use serde_derive::{Serialize, Deserialize};
use lazy_static::lazy_static;
use std::collections::HashMap;
use serde_json::{Value, Map};
use async_std::sync::{Arc, RwLock};
use surf::{Error, StatusCode};
use surf::http::Method;
use async_std::task::block_on;
use super::health;
use super::catalog;
use super::config_entry;
use super::api;
type ServiceKind = String;
lazy_static! {
pub static ref SERVICE_KIND_TYPICAL: ServiceKind = {
String::new()
};
pub static ref SERVICE_KIND_CONNECT_PROXY: ServiceKind = {
String::from("connect-proxy")
};
pub static ref SERVICE_KIND_MESH_GATEWAY: ServiceKind = {
String::from("mesh-gateway")
};
pub static ref SERVICE_KIND_TERMINATING_GATEWAY: ServiceKind = {
String::from("terminating-gateway")
};
}
type UpstreamDestType = String;
lazy_static! {
pub static ref UPSTREAM_DEST_TYPE_SERVICE: UpstreamDestType = {
String::from("service")
};
pub static ref UPSTREAM_DEST_TYPE_PREPARED_QUERY: UpstreamDestType = {
String::from("prepared_query")
};
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct AgentCheck {
pub Node: Option<String>,
pub CheckID: Option<String>,
pub Name: Option<String>,
pub Status: Option<String>,
pub Notes: Option<String>,
pub Output: Option<String>,
pub ServiceID: Option<String>,
pub ServiceName: Option<String>,
pub Type: Option<String>,
pub Definition: Option<health::HealthCheckDefinition>,
pub Namespace: String,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct Filter {
pub filter: String,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct AgentWeights {
pub Passing: Option<usize>,
pub Warning: Option<usize>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct AgentService {
pub Kind: Option<ServiceKind>,
pub ID: Option<String>,
pub Service: Option<String>,
pub Tags: Option<Vec<String>>,
pub Meta: Option<HashMap<String, String>>,
pub Port: Option<usize>,
pub Address: Option<String>,
pub TaggedAddresses: Option<HashMap<String, catalog::ServiceAddress>>,
pub Weights: Option<AgentWeights>,
pub EnableTagOverride: Option<bool>,
pub CreateIndex: Option<u64>,
pub ModifyIndex: Option<u64>,
pub ContentHash: Option<String>,
pub Proxy: Option<AgentServiceConnectProxyConfig>,
pub Connect: Option<AgentServiceConnect>,
pub Namespace: Option<String>,
pub Datacenter: Option<String>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct AgentServiceChecksInfo {
pub AggregatedStatus: Option<String>,
pub Service: Option<AgentService>,
pub Checks: Option<health::HealthChecks>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct AgentServiceConnect {
pub Native: Option<bool>,
pub SidecarService: Box<Option<AgentServiceRegistration>>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct AgentServiceConnectProxyConfig {
pub DestinationServiceName: Option<String>,
pub DestinationServiceID: Option<String>,
pub LocalServiceAddress: Option<String>,
pub LocalServicePort: Option<String>,
pub Mode: Option<config_entry::ProxyMode>,
pub TransparentProxy: Option<String>,
pub Config: Option<HashMap<String, Value>>,
pub Upstreams: Option<Vec<Upstream>>,
pub MeshGateway: Option<config_entry::MeshGatewayConfig>,
pub Expose: Option<config_entry::ExposeConfig>,
}
pub const MEMBER_TAG_KEY_ACL_MODE: &str = "acls";
pub const MEMBER_TAG_KEY_ROLE: &str = "role";
pub const MEMBER_TAG_VALUE_ROLE_SERVER: &str = "consul";
pub const MEMBER_TAG_KEY_SEGMENT: &str = "segment";
pub const MEMBER_TAG_KEY_BOOTSTRAP: &str = "bootstrap";
pub const MEMBER_TAG_VALUE_BOOTSTRAP: &str = "1";
pub const MEMBER_TAG_KEY_BOOTSTRAP_EXPECT: &str = "expect";
pub const MEMBER_TAG_KEY_USE_TLS: &str = "use_tls";
pub const MEMBER_TAG_VALUE_USE_TLS: &str = "1";
pub const MEMBER_TAG_KEY_READ_REPLICA: &str = "read_replica";
pub const MEMBER_TAG_VALUE_READ_REPLICA: &str = "1";
pub type MemberACLMode = String;
lazy_static!(
pub static ref ACL_MODE_DISABLED:Arc<MemberACLMode> = {
Arc::new(String::from("0"))
};
pub static ref ACL_MODE_ENABLED:Arc<MemberACLMode> = {
Arc::new(String::from("1"))
};
pub static ref ACL_MODE_LEGACY:Arc<MemberACLMode> = {
Arc::new(String::from("2"))
};
pub static ref ACL_MODE_UNKNOWN:Arc<MemberACLMode> = {
Arc::new(String::from("3"))
};
);
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct AgentMember {
pub Name: Option<String>,
pub Addr: Option<String>,
pub Port: Option<u16>,
pub Tags: Option<HashMap<String, String>>,
pub Status: Option<isize>,
pub ProtocolMin: Option<u8>,
pub ProtocolMax: Option<u8>,
pub ProtocolCur: Option<u8>,
pub DelegateMin: Option<u8>,
pub DelegateMax: Option<u8>,
pub DelegateCur: Option<u8>,
}
impl AgentMember {
pub async fn acl_mode(&self) -> MemberACLMode {
return if self.Tags.is_some() {
let tags = self.Tags.as_ref().unwrap();
let mode = tags.get(MEMBER_TAG_KEY_ACL_MODE);
return if mode.is_some() {
let tag = mode.unwrap();
if tag == &ACL_MODE_DISABLED.clone().to_string() {
ACL_MODE_DISABLED.clone().to_string()
} else if tag == &ACL_MODE_ENABLED.clone().to_string() {
ACL_MODE_ENABLED.clone().to_string()
} else if tag == &ACL_MODE_LEGACY.clone().to_string() {
ACL_MODE_LEGACY.clone().to_string()
} else {
ACL_MODE_UNKNOWN.clone().to_string()
}
} else {
ACL_MODE_UNKNOWN.clone().to_string()
};
} else {
ACL_MODE_UNKNOWN.clone().to_string()
};
}
pub async fn is_consul_server(&self) -> bool {
return if self.Tags.is_some() {
let tags = self.Tags.as_ref().unwrap();
let key = &*MEMBER_TAG_KEY_ROLE.clone();
let res = tags.get(key);
return if res.is_some() {
let tag = res.unwrap();
if tag == &*MEMBER_TAG_VALUE_ROLE_SERVER.clone() {
true
} else {
false
}
} else {
false
};
} else {
false
};
}
}
lazy_static!(
pub static ref ALL_SEGMENTS: Arc<String> = {
Arc::new(String::from("_all"))
};
);
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct MembersOpts {
pub WAN: Option<bool>,
pub Segment: Option<String>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct AgentServiceRegistration {
pub Kind: Option<ServiceKind>,
pub ID: Option<String>,
pub Name: Option<String>,
pub Tags: Option<Vec<String>>,
pub Port: Option<usize>,
pub Address: Option<String>,
pub TaggedAddresses: Option<HashMap<String, catalog::ServiceAddress>>,
pub EnableTagOverride: Option<bool>,
pub Meta: Option<HashMap<String, String>>,
pub Weights: Option<AgentWeights>,
pub Check: Option<AgentServiceCheck>,
pub Checks: Option<AgentServiceChecks>,
pub Proxy: Option<AgentServiceConnectProxyConfig>,
pub Connect: Option<AgentServiceConnect>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct ServiceRegisterOpts {
#[serde(rename = "replace-existing-checks")]
pub ReplaceExistingChecks: bool,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct ConnectProxyConfig {
pub ProxyServiceID: Option<String>,
pub TargetServiceID: Option<String>,
pub TargetServiceName: Option<String>,
pub ContentHash: Option<String>,
pub Config: HashMap<String, Value>,
pub Upstreams: Vec<Upstream>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct Upstream {
pub DestinationType: Option<UpstreamDestType>,
pub DestinationNamespace: Option<String>,
pub DestinationName: Option<String>,
pub Datacenter: Option<String>,
pub LocalBindAddress: Option<String>,
pub LocalBindPort: Option<usize>,
pub Config: HashMap<String, Value>,
pub MeshGateway: Option<config_entry::MeshGatewayConfig>,
pub CentrallyConfigured: Option<bool>,
}
type AgentServiceChecks = Vec<AgentServiceCheck>;
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct AgentServiceCheck {
pub CheckID: Option<String>,
pub Name: Option<String>,
pub Args: Option<Vec<String>>,
pub DockerContainerID: Option<String>,
pub Shell: Option<String>,
pub Interval: Option<String>,
pub Timeout: Option<String>,
pub TTL: Option<String>,
pub HTTP: Option<String>,
pub Header: Option<HashMap<String, String>>,
pub Method: Option<String>,
pub Body: Option<String>,
pub TCP: Option<String>,
pub Status: Option<String>,
pub Notes: Option<String>,
pub TLSServerName: Option<String>,
pub TLSSkipVerify: Option<bool>,
pub GRPC: Option<String>,
pub GRPCUseTLS: Option<bool>,
pub AliasNode: Option<String>,
pub AliasService: Option<String>,
pub SuccessBeforePassing: Option<i64>,
pub FailuresBeforeCritical: Option<i64>,
pub DeregisterCriticalServiceAfter: Option<String>,
}
#[derive(Default, Debug, Copy, Clone)]
#[allow(non_snake_case)]
pub struct Agent {
pub c: Option<api::Client>,
nodeName: Option<&'static str>,
}
lazy_static! {
pub static ref AGENT: Arc<RwLock<Agent>> = {
let client = api::CLIENT.clone();
let lock = block_on(client.read());
let agent = block_on(lock.agent());
Arc::new(RwLock::new(agent))
};
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct MetricsInfo {
pub Timestamp: Option<String>,
pub Gauges: Option<Vec<GaugeValue>>,
pub Points: Option<Vec<PointValue>>,
pub Counters: Option<Vec<SampledValue>>,
pub Samples: Option<Vec<SampledValue>>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct GaugeValue {
pub Name: Option<String>,
pub Value: Option<f32>,
pub Labels: Option<HashMap<String, String>>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct PointValue {
pub Name: Option<String>,
pub Points: Option<Vec<f32>>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct SampledValue {
pub Name: Option<String>,
pub Count: Option<i64>,
pub Sum: Option<f64>,
pub Min: Option<f64>,
pub Max: Option<f64>,
pub Mean: Option<f64>,
pub Stddev: Option<f64>,
pub Labels: Option<HashMap<String, String>>,
}
impl Agent {
pub async fn reload_client(){
let client = api::CLIENT.clone();
let client = client.read().await;
let s = client.agent().await;
let agent = AGENT.clone();
let mut agent = agent.write().await;
*agent = s;
}
pub async fn my_self(&self) -> surf::Result<Map<String, Value>> {
if self.c.is_some() {
let client = self.c.unwrap();
let req = client.new_request(Method::Get,
"/v1/agent/self".to_string()).await?;
let client = surf::Client::new();
let mut res = client.send(req).await?;
let body: Map<String, Value> = res.body_json().await?;
Ok(body)
} else {
Err(Error::from_str(StatusCode::BadRequest, "client init err"))
}
}
pub async fn host(&self) -> surf::Result<HashMap<String, Value>> {
let client = self.c.unwrap();
let req = client.new_request(Method::Get,
"/v1/agent/host".to_string()).await?;
let client = surf::Client::new();
let mut res = client.send(req).await?;
let body: HashMap<String, Value> = res.body_json().await?;
Ok(body)
}
pub async fn metrics(&self) -> surf::Result<MetricsInfo> {
if self.c.is_some() {
let client = self.c.unwrap();
let req = client.new_request(Method::Get,
"/v1/agent/metrics".to_string()).await?;
let client = surf::Client::new();
let mut res = client.send(req).await?;
let body: MetricsInfo = res.body_json().await?;
Ok(body)
} else {
Err(Error::from_str(StatusCode::BadRequest, "client init err"))
}
}
pub async fn reload(&self) -> surf::Result<()> {
if self.c.is_some() {
let client = self.c.unwrap();
let req = client.new_request(Method::Put,
"/v1/agent/reload".to_string()).await?;
let client = surf::Client::new();
client.send(req).await?;
Ok(())
} else {
Err(Error::from_str(400, "client init err"))
}
}
async fn node(&self) -> surf::Result<String> {
let info = self.my_self().await?;
let config = info.get("Config").expect("Config key is not exist");
let name = config.get("NodeName").expect("node name is not exist");
let name = name.to_string();
Ok(name)
}
pub async fn node_name(&self) -> surf::Result<String> {
if self.nodeName.is_some() {
let node_name = self.nodeName.unwrap();
if node_name != "" {
Ok(node_name.to_string())
} else {
let node_name = self.node().await?;
Ok(node_name)
}
} else {
let node_name = self.node().await?;
Ok(node_name)
}
}
pub async fn checks(&self) -> surf::Result<HashMap<String, AgentCheck>> {
let val = self.checks_with_filter(Filter::default()).await?;
Ok(val)
}
pub async fn checks_with_filter(&self, filter: Filter) -> surf::Result<HashMap<String, AgentCheck>> {
let val = self.checks_with_filter_opts(filter,
None).await?;
Ok(val)
}
pub async fn checks_with_filter_opts(&self,
filter: Filter,
opts: Option<api::QueryOptions>) -> surf::Result<HashMap<String, AgentCheck>> {
if self.c.is_some() {
let client = self.c.unwrap();
let mut req = client.new_request(Method::Get,
"/v1/agent/checks".to_string()).await?;
if opts.is_some() {
req.set_query(&opts.unwrap())?;
}
if filter.filter != "" {
req.set_query(&filter)?;
};
let client = surf::Client::new();
let mut res = client.send(req).await?;
let body: HashMap<String, AgentCheck> = res.body_json().await?;
Ok(body)
} else {
Err(Error::from_str(StatusCode::BadRequest, "client init err"))
}
}
pub async fn services(&self) -> surf::Result<HashMap<String, AgentService>> {
return self.services_with_filter(Filter::default()).await;
}
pub async fn services_with_filter(&self, filter: Filter) -> surf::Result<HashMap<String, AgentService>> {
return self.services_with_filter_opts(filter, None).await;
}
pub async fn services_with_filter_opts(&self, filter: Filter, opts: Option<api::QueryOptions>) -> surf::Result<HashMap<String, AgentService>> {
if self.c.is_some() {
let client = self.c.unwrap();
let mut req = client.new_request(Method::Get,
"/v1/agent/services".to_string()).await?;
if opts.is_some() {
req.set_query(&opts.unwrap())?;
}
if filter.filter != "" {
req.set_query(&filter)?;
}
let client = surf::Client::new();
let mut res = client.send(req).await?;
let body: HashMap<String, AgentService> = res.body_json().await?;
Ok(body)
} else {
Err(Error::from_str(StatusCode::BadRequest, "client init err"))
}
}
pub async fn service_register(&self, service: AgentServiceRegistration) -> surf::Result<StatusCode> {
let opts = ServiceRegisterOpts::default();
let status = self.service_register_self(service, opts).await?;
Ok(status)
}
pub async fn service_register_opts(&self, service: AgentServiceRegistration, opts: ServiceRegisterOpts) -> surf::Result<StatusCode> {
let status = self.service_register_self(service, opts).await?;
Ok(status)
}
async fn service_register_self(&self, service: AgentServiceRegistration,
opts: ServiceRegisterOpts) -> surf::Result<StatusCode> {
if self.c.is_some() {
let client = self.c.unwrap();
let mut req = client.new_request(Method::Put,
"/v1/agent/service/register".to_string()).await?;
if opts.ReplaceExistingChecks == true {
req.set_query(&opts)?;
};
req.body_json(&service)?;
let client = surf::Client::new();
let res = client.send(req).await?;
Ok(res.status())
} else {
Err(Error::from_str(StatusCode::BadRequest, "client init err"))
}
}
pub async fn service_deregister(&self, service_id: String) -> surf::Result<StatusCode> {
if self.c.is_some() {
let client = self.c.unwrap();
let req = client.new_request(Method::Put,
format!("/v1/agent/service/deregister/{}", service_id)).await?;
let client = surf::Client::new();
let res = client.send(req).await?;
Ok(res.status())
} else {
Err(Error::from_str(StatusCode::BadRequest, "client init err"))
}
}
pub async fn service_deregister_opts(&self, service_id: String, opts: api::QueryOptions) -> surf::Result<StatusCode> {
if self.c.is_some() {
let client = self.c.unwrap();
let mut req = client.new_request(Method::Put,
format!("/v1/agent/service/deregister/{}", service_id)).await?;
req.set_query(&opts)?;
let client = surf::Client::new();
let res = client.send(req).await?;
Ok(res.status())
} else {
Err(Error::from_str(StatusCode::BadRequest, "client init err"))
}
}
}
#[cfg(test)]
mod tests {
use crate::agent;
use crate::api;
use async_std::task::block_on;
#[test]
fn test_my_self() {
block_on(api::Client::set_config_address("http://0.0.0.0:8500"));
block_on(agent::Agent::reload_client());
let lock = agent::AGENT.clone();
let agent = block_on(lock.read());
let s = block_on(agent.my_self()).unwrap();
println!("{:?}", s)
}
#[test]
fn test_host() {
let lock = agent::AGENT.clone();
let agent = block_on(lock.read());
let s = block_on(agent.host()).unwrap();
println!("{:?}", s)
}
#[test]
fn test_checks() {
let lock = agent::AGENT.clone();
let agent = block_on(lock.read());
let s = block_on(agent.checks()).unwrap();
println!("{:#?}", s);
}
#[test]
fn test_services() {
let lock = agent::AGENT.clone();
let agent = block_on(lock.read());
let res = block_on(agent.services()).unwrap();
println!("{:?}", res);
}
#[test]
fn test_service_register() {
let mut service = agent::AgentServiceRegistration::default();
service.ID = Some("10".to_string());
service.Name = Some("test".to_string());
service.Address = Some("tttt".to_string());
service.Port = Some(8080);
let mut opts = agent::ServiceRegisterOpts::default();
opts.ReplaceExistingChecks = true;
let lock = agent::AGENT.clone();
let agent = block_on(lock.read());
let s = block_on(agent.service_register_opts(service, opts)).unwrap();
println!("{}", s)
}
#[test]
fn test_service_deregister() {
let lock = agent::AGENT.clone();
let agent = block_on(lock.read());
let s = block_on(agent.service_deregister("10".to_string())).unwrap();
println!("{}", s)
}
}