use kovan_map::HashMap;
use spire_proto::spiredb::cluster::{
Empty, StoreState, cluster_service_client::ClusterServiceClient,
};
use std::sync::Arc;
use std::time::Duration;
use tonic::transport::Channel;
const REFRESH_INTERVAL: Duration = Duration::from_secs(5);
use parking_lot::RwLock;
pub struct ClusterTopology {
cluster_client: ClusterServiceClient<Channel>,
stores: Arc<HashMap<u64, StoreInfo>>,
leader: RwLock<Option<LeaderInfo>>,
}
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct StoreInfo {
pub id: u64,
pub address: String,
}
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct LeaderInfo {
pub address: String,
pub store_id: u64,
}
impl std::fmt::Debug for ClusterTopology {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClusterTopology")
.field("stores_count", &self.stores.len())
.field("leader", &self.leader.read())
.finish()
}
}
impl ClusterTopology {
pub fn new(cluster_client: ClusterServiceClient<Channel>) -> Self {
Self {
cluster_client,
stores: Arc::new(HashMap::new()),
leader: RwLock::new(None),
}
}
pub fn get_leader_address(&self) -> Option<LeaderInfo> {
self.leader.read().clone()
}
pub fn start_refresh_task(self: Arc<Self>) {
tokio::spawn(async move {
loop {
if let Err(e) = self.refresh().await {
log::warn!("Failed to refresh cluster topology: {}", e);
}
tokio::time::sleep(REFRESH_INTERVAL).await;
}
});
}
pub async fn refresh(&self) -> Result<(), tonic::Status> {
let mut client = self.cluster_client.clone();
let response = client.list_stores(Empty {}).await?;
let store_list = response.into_inner();
let prev_count = self.stores.len();
let mut count = 0;
let mut found_leader: Option<LeaderInfo> = None;
for store in store_list.stores {
if store.state == StoreState::StoreUp as i32 {
if store.is_leader {
found_leader = Some(LeaderInfo {
address: store.address.clone(),
store_id: store.id,
});
}
self.stores.insert(
store.id,
StoreInfo {
id: store.id,
address: store.address.clone(),
},
);
log::debug!(
"Store {} at {} (leader={})",
store.id,
store.address,
store.is_leader
);
count += 1;
} else {
self.stores.remove(&store.id);
}
}
if let Some(leader) = found_leader {
*self.leader.write() = Some(leader);
} else if count > 0 {
log::warn!("No PD leader reported in topology refresh");
}
if count != prev_count {
log::info!("Cluster topology updated: {} active stores", count);
}
Ok(())
}
pub fn get_store_address(&self, store_id: u64) -> Option<String> {
self.stores.get(&store_id).map(|s| s.address.clone())
}
#[allow(dead_code)]
pub fn all_stores(&self) -> Vec<StoreInfo> {
self.stores.iter().map(|(_, v)| v.clone()).collect()
}
pub fn store_count(&self) -> usize {
self.stores.len()
}
}