use crate::grpc::health::HealthClient;
use crate::grpc::manager::ManagerClient;
use dragonfly_api::manager::v2::{
ListSchedulersRequest, ListSchedulersResponse, Scheduler, SourceType,
};
use dragonfly_client_config::{dfdaemon::Config, CARGO_PKG_VERSION, GIT_COMMIT_SHORT_HASH};
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::net::format_url;
use dragonfly_client_util::shutdown;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex, RwLock};
use tonic_health::pb::health_check_response::ServingStatus;
use tracing::{debug, error, info, instrument};
use url::Url;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct SchedulerClusterClientConfig {
pub block_list: Option<SchedulerClusterConfigBlockList>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct SchedulerClusterSeedClientConfig {
pub block_list: Option<SchedulerClusterConfigBlockList>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct SchedulerClusterConfigBlockList {
pub task: Option<SchedulerClusterConfigTaskBlockList>,
pub persistent_task: Option<SchedulerClusterConfigPersistentTaskBlockList>,
pub persistent_cache_task: Option<SchedulerClusterConfigPersistentCacheTaskBlockList>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct SchedulerClusterConfigTaskBlockList {
pub download: Option<SchedulerClusterConfigDownloadBlockList>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct SchedulerClusterConfigPersistentTaskBlockList {
pub download: Option<SchedulerClusterConfigDownloadBlockList>,
pub upload: Option<SchedulerClusterConfigUploadBlockList>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct SchedulerClusterConfigPersistentCacheTaskBlockList {
pub download: Option<SchedulerClusterConfigDownloadBlockList>,
pub upload: Option<SchedulerClusterConfigUploadBlockList>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct SchedulerClusterConfigDownloadBlockList {
pub applications: Option<Vec<String>>,
#[serde(with = "serde_regex")]
pub urls: Vec<Regex>,
pub tags: Option<Vec<String>>,
pub priorities: Option<Vec<i32>>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct SchedulerClusterConfigUploadBlockList {
pub applications: Option<Vec<String>>,
#[serde(with = "serde_regex")]
pub urls: Vec<Regex>,
pub tags: Option<Vec<String>>,
}
#[derive(Default)]
pub struct Data {
pub schedulers: ListSchedulersResponse,
pub available_schedulers: Vec<Scheduler>,
pub available_scheduler_cluster_id: Option<u64>,
pub client_config: Option<SchedulerClusterClientConfig>,
pub seed_client_config: Option<SchedulerClusterSeedClientConfig>,
}
pub struct Dynconfig {
pub data: RwLock<Data>,
config: Arc<Config>,
manager_client: Arc<ManagerClient>,
mutex: Mutex<()>,
shutdown: shutdown::Shutdown,
_shutdown_complete: mpsc::UnboundedSender<()>,
}
impl Dynconfig {
pub async fn new(
config: Arc<Config>,
manager_client: Arc<ManagerClient>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> Result<Self> {
let dc = Dynconfig {
config,
data: RwLock::new(Data::default()),
manager_client,
mutex: Mutex::new(()),
shutdown,
_shutdown_complete: shutdown_complete_tx,
};
dc.refresh().await?;
Ok(dc)
}
pub async fn run(&self) {
let mut shutdown = self.shutdown.clone();
let mut interval = tokio::time::interval(self.config.dynconfig.refresh_interval);
loop {
tokio::select! {
_ = interval.tick() => {
match self.refresh().await {
Err(err) => error!("refresh dynconfig failed: {}", err),
Ok(_) => debug!("refresh dynconfig success"),
}
}
_ = shutdown.recv() => {
info!("dynconfig server shutting down");
return
}
}
}
}
#[instrument(skip_all)]
pub async fn refresh(&self) -> Result<()> {
let Ok(_guard) = self.mutex.try_lock() else {
debug!("refresh is already running");
return Ok(());
};
let schedulers = self.list_schedulers().await?;
let available_schedulers = self
.get_available_schedulers(&schedulers.schedulers)
.await?;
let Some(available_scheduler) = available_schedulers.first() else {
return Err(Error::AvailableSchedulersNotFound);
};
let scheduler_cluster_id = available_scheduler.scheduler_cluster_id;
let Some(scheduler_cluster) = &available_scheduler.scheduler_cluster else {
return Err(Error::AvailableSchedulersNotFound);
};
let client_config = match serde_json::from_slice::<SchedulerClusterClientConfig>(
&scheduler_cluster.client_config,
) {
Ok(config) => Some(config),
Err(err) => {
error!("failed to deserialize client config: {}", err);
None
}
};
let seed_client_config = match serde_json::from_slice::<SchedulerClusterSeedClientConfig>(
&scheduler_cluster.seed_client_config,
) {
Ok(config) => Some(config),
Err(err) => {
error!("failed to deserialize seed client config: {}", err);
None
}
};
let mut data = self.data.write().await;
data.schedulers = schedulers;
data.available_schedulers = available_schedulers;
data.available_scheduler_cluster_id = Some(scheduler_cluster_id);
data.client_config = client_config;
data.seed_client_config = seed_client_config;
Ok(())
}
#[instrument(skip_all)]
async fn list_schedulers(&self) -> Result<ListSchedulersResponse> {
let source_type = if self.config.seed_peer.enable {
SourceType::SeedPeerSource.into()
} else {
SourceType::PeerSource.into()
};
self.manager_client
.list_schedulers(ListSchedulersRequest {
source_type,
hostname: self.config.host.hostname.clone(),
ip: self.config.host.ip.unwrap().to_string(),
idc: self.config.host.idc.clone(),
location: self.config.host.location.clone(),
version: CARGO_PKG_VERSION.to_string(),
commit: GIT_COMMIT_SHORT_HASH.to_string(),
scheduler_cluster_id: self.config.host.scheduler_cluster_id.unwrap_or(0),
})
.await
}
#[instrument(skip_all)]
async fn get_available_schedulers(&self, schedulers: &[Scheduler]) -> Result<Vec<Scheduler>> {
let mut available_schedulers: Vec<Scheduler> = Vec::new();
let mut available_scheduler_cluster_id: Option<u64> = None;
for scheduler in schedulers {
if let Some(scheduler_cluster_id) = available_scheduler_cluster_id {
if scheduler.scheduler_cluster_id != scheduler_cluster_id {
continue;
}
}
let addr = format_url(
"http",
IpAddr::from_str(&scheduler.ip)?,
scheduler.port as u16,
);
let domain_name = Url::parse(addr.as_str())?
.host_str()
.ok_or(Error::InvalidParameter)
.inspect_err(|_err| {
error!("invalid address: {}", addr);
})?
.to_string();
let health_client = match HealthClient::new(
&addr,
self.config
.scheduler
.load_client_tls_config(domain_name.as_str())
.await?,
)
.await
{
Ok(client) => client,
Err(err) => {
error!(
"create health client for scheduler {}:{} failed: {}",
scheduler.ip, scheduler.port, err
);
continue;
}
};
match health_client.check().await {
Ok(resp) => {
if resp.status == ServingStatus::Serving as i32 {
available_schedulers.push(scheduler.clone());
available_scheduler_cluster_id = Some(scheduler.scheduler_cluster_id);
}
}
Err(err) => {
error!("check scheduler health failed: {}", err);
continue;
}
}
}
Ok(available_schedulers)
}
}