use crate::event_store::client::gossip as wire;
use crate::grpc::HyperClient;
use crate::http::http_configure_auth;
use crate::request::build_request_metadata;
use crate::types::Endpoint;
use crate::{grpc, ClientSettings};
use serde::{Deserialize, Serialize};
use tonic::{Request, Status};
use uuid::Uuid;
pub async fn read(
settings: &ClientSettings,
client: &HyperClient,
uri: hyper::Uri,
) -> Result<Vec<MemberInfo>, Status> {
let inner = wire::gossip_client::GossipClient::with_origin(client, uri);
let mut req = Request::new(());
*req.metadata_mut() = build_request_metadata(settings, &Default::default());
let wire_members = inner.clone().read(req).await?.into_inner().members;
let mut members = Vec::with_capacity(wire_members.capacity());
for wire_member in wire_members {
let state = VNodeState::from_i32(wire_member.state)?;
let instance_id = if let Some(wire_uuid) = wire_member.instance_id {
wire_uuid.try_into().unwrap()
} else {
Uuid::nil()
};
let http_end_point = if let Some(endpoint) = wire_member.http_end_point {
let endpoint = Endpoint {
host: endpoint.address,
port: endpoint.port,
};
Ok(endpoint)
} else {
Err(Status::failed_precondition(
"MemberInfo endpoint must be defined",
))
}?;
let member = MemberInfo {
instance_id,
state,
is_alive: wire_member.is_alive,
time_stamp: wire_member.time_stamp,
http_end_point,
last_commit_position: 0,
writer_checkpoint: 0,
chaser_checkpoint: 0,
epoch_position: 0,
epoch_number: 0,
epoch_id: Default::default(),
node_priority: 0,
};
members.push(member);
}
Ok(members)
}
pub(crate) async fn http_read(
setts: &ClientSettings,
handle: grpc::Handle,
) -> Result<Vec<MemberInfo>, Box<dyn std::error::Error>> {
let client = reqwest::Client::builder()
.danger_accept_invalid_certs(!setts.tls_verify_cert)
.build()?;
let resp = http_configure_auth(
client.get(format!("{}/gossip", handle.url())),
setts.default_user_name.as_ref(),
)
.send()
.await?;
let gossip = resp.json::<Gossip>().await?;
Ok(gossip
.members
.into_iter()
.map(|i| MemberInfo {
instance_id: i.instance_id,
time_stamp: i.time_stamp.timestamp(),
state: i.state,
is_alive: i.is_alive,
http_end_point: Endpoint {
host: i.external_http_ip,
port: i.external_http_port as u32,
},
last_commit_position: i.last_commit_position,
writer_checkpoint: i.writer_checkpoint,
chaser_checkpoint: i.chaser_checkpoint,
epoch_position: i.epoch_position,
epoch_number: i.epoch_number,
epoch_id: i.epoch_id,
node_priority: i.node_priority,
})
.collect())
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct Gossip {
members: Vec<HttpMemberInfo>,
}
#[derive(Debug, Clone)]
pub struct MemberInfo {
pub instance_id: Uuid,
pub time_stamp: i64,
pub state: VNodeState,
pub is_alive: bool,
pub http_end_point: Endpoint,
pub last_commit_position: i64,
pub writer_checkpoint: i64,
pub chaser_checkpoint: i64,
pub epoch_position: i64,
pub epoch_number: i64,
pub epoch_id: Uuid,
pub node_priority: i64,
}
#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct HttpMemberInfo {
pub instance_id: Uuid,
pub time_stamp: chrono::DateTime<chrono::Utc>,
pub state: VNodeState,
pub is_alive: bool,
pub internal_tcp_ip: String,
pub internal_tcp_port: u16,
pub internal_secure_tcp_port: u16,
pub external_tcp_ip: String,
pub external_secure_tcp_port: u16,
#[serde(rename = "httpEndPointIp")]
pub external_http_ip: String,
#[serde(rename = "httpEndPointPort")]
pub external_http_port: u16,
pub last_commit_position: i64,
pub writer_checkpoint: i64,
pub chaser_checkpoint: i64,
pub epoch_position: i64,
pub epoch_number: i64,
pub epoch_id: Uuid,
pub node_priority: i64,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub enum VNodeState {
Initializing,
DiscoverLeader,
Unknown,
PreReplica,
CatchingUp,
Clone,
Follower,
PreLeader,
Leader,
Manager,
ShuttingDown,
Shutdown,
ReadOnlyLeaderLess,
PreReadOnlyReplica,
ReadOnlyReplica,
ResigningLeader,
}
impl VNodeState {
pub fn from_i32(value: i32) -> Result<Self, Status> {
match value {
0 => Ok(VNodeState::Initializing),
1 => Ok(VNodeState::DiscoverLeader),
2 => Ok(VNodeState::Unknown),
3 => Ok(VNodeState::PreReplica),
4 => Ok(VNodeState::CatchingUp),
5 => Ok(VNodeState::Clone),
6 => Ok(VNodeState::Follower),
7 => Ok(VNodeState::PreLeader),
8 => Ok(VNodeState::Leader),
9 => Ok(VNodeState::Manager),
10 => Ok(VNodeState::ShuttingDown),
11 => Ok(VNodeState::Shutdown),
12 => Ok(VNodeState::ReadOnlyLeaderLess),
13 => Ok(VNodeState::PreReadOnlyReplica),
14 => Ok(VNodeState::ReadOnlyReplica),
15 => Ok(VNodeState::ResigningLeader),
_ => Err(Status::out_of_range(format!(
"Unknown VNodeState value: {}",
value
))),
}
}
}