use crate::conf::{ClientConfig, ConRegConfig, DiscoveryConfig};
use crate::network::HTTP;
use crate::protocol::Instance;
use crate::protocol::request::{GetInstancesReq, HeartbeatReq, RegisterReq};
use crate::protocol::response::HeartbeatResult;
use dashmap::DashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct DiscoveryClient {
service_id: String,
client: ClientConfig,
config: DiscoveryConfig,
}
impl DiscoveryClient {
pub(crate) fn new(config: &ConRegConfig) -> Self {
Self {
service_id: config.service_id.clone(),
client: config.client.clone(),
config: config.discovery.clone().unwrap(),
}
}
pub(crate) async fn register(&self) -> anyhow::Result<Instance> {
let req = RegisterReq {
namespace_id: self.config.namespace.clone(),
service_id: self.service_id.clone(),
ip: self.client.address.clone(),
port: self.client.port,
meta: self.config.meta.clone(),
};
let instance = HTTP
.post::<Instance>(
&self
.config
.server_addr
.build_url("/api/discovery/instance/register")?,
req,
)
.await?;
log::info!("register instance with service id: {}", self.service_id);
Ok(instance)
}
pub(crate) async fn fetch_instances(&self, service_id: &str) -> anyhow::Result<Vec<Instance>> {
let req = GetInstancesReq {
namespace_id: self.config.namespace.clone(),
service_id: service_id.to_string(),
};
HTTP.get::<Vec<Instance>>(
&self
.config
.server_addr
.build_url("/api/discovery/instance/available")?,
req,
match &self.config.auth_token {
Some(token) => Some(vec![(crate::NS_TOKEN_HEADER, token)]),
None => None,
},
)
.await
}
async fn heartbeat(&self) -> anyhow::Result<HeartbeatResult> {
let req = HeartbeatReq {
namespace_id: self.config.namespace.clone(),
service_id: self.service_id.to_string(),
instance_id: self.client.gen_instance_id(),
};
HTTP.post::<HeartbeatResult>(
&self
.config
.server_addr
.build_url("/api/discovery/heartbeat")?,
req,
)
.await
}
}
#[derive(Debug)]
pub struct Discovery {
services: Arc<DashMap<String, Vec<Instance>>>,
client: DiscoveryClient,
}
impl Discovery {
pub(crate) async fn new(client: DiscoveryClient) -> Self {
let discovery = Discovery {
services: Arc::new(DashMap::new()),
client,
};
discovery.start_fetch_task();
discovery.start_heartbeat();
discovery
}
fn start_fetch_task(&self) {
log::info!("start service instances fetch task");
let client = Arc::new(self.client.clone());
let services = self.services.clone();
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(Duration::from_secs(30));
loop {
interval_timer.tick().await;
let service_ids: Vec<String> =
services.iter().map(|entry| entry.key().clone()).collect();
for service_id in service_ids {
match Self::fetch_instances_(&client, &service_id).await {
Ok(instances) => {
services.insert(service_id, instances);
}
Err(e) => {
log::error!(
"fetch service instance error, service id: {}, error: {}",
service_id,
e
);
}
}
}
}
});
}
fn start_heartbeat(&self) {
let client = Arc::new(self.client.clone());
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(Duration::from_secs(5));
loop {
interval_timer.tick().await;
log::debug!("ping");
match client.heartbeat().await {
Ok(res) => match res {
HeartbeatResult::Ok => {
log::debug!("pong");
}
HeartbeatResult::NoInstanceFound => {
log::warn!("no instance found, try re-register");
if let Err(e) = client.register().await {
log::error!("register error:{}", e);
}
}
HeartbeatResult::Rejected => {
log::warn!("heartbeat rejected");
}
HeartbeatResult::Unknown => {
log::error!("Unknown heartbeat result");
}
},
Err(e) => {
log::error!("heartbeat error: {}", e);
}
}
}
});
}
pub(crate) async fn get_instances(&self, service_id: &str) -> Vec<Instance> {
match self.services.get(service_id) {
Some(instances) => instances.clone(),
None => self.fetch_instances(service_id).await.unwrap_or_else(|e| {
log::error!("Failed to fetch instances: {}", e);
vec![]
}),
}
}
async fn fetch_instances(&self, service_id: &str) -> anyhow::Result<Vec<Instance>> {
let instances = self.client.fetch_instances(service_id).await?;
self.services
.insert(service_id.to_string(), instances.clone());
Ok(instances)
}
async fn fetch_instances_(
client: &DiscoveryClient,
service_id: &str,
) -> anyhow::Result<Vec<Instance>> {
let instances = client.fetch_instances(service_id).await?;
Ok(instances)
}
}