use crate::grpc::scheduler::SchedulerClient;
use dragonfly_api::common::v2::{Build, CgroupCpu, CgroupMemory, Cpu, Disk, Host, Memory, Network};
use dragonfly_api::scheduler::v2::{AnnounceHostRequest, DeleteHostRequest};
use dragonfly_client_config::{
dfdaemon::{Config, HostType},
CARGO_PKG_RUSTC_VERSION, CARGO_PKG_VERSION, GIT_COMMIT_SHORT_HASH, INSTANCE_NAME,
};
use dragonfly_client_core::error::{ErrorType, OrErr};
use dragonfly_client_core::Result;
use dragonfly_client_util::{container::is_running_in_container, shutdown, sysinfo::SystemMonitor};
use std::env;
use std::sync::Arc;
use sysinfo::System;
use tokio::sync::mpsc;
use tracing::{debug, error, info, instrument};
pub struct SchedulerAnnouncer {
config: Arc<Config>,
host_id: String,
scheduler_client: Arc<SchedulerClient>,
system_monitor: Arc<SystemMonitor>,
is_running_in_container: bool,
shutdown: shutdown::Shutdown,
_shutdown_complete: mpsc::UnboundedSender<()>,
}
impl SchedulerAnnouncer {
pub async fn new(
config: Arc<Config>,
host_id: String,
scheduler_client: Arc<SchedulerClient>,
system_monitor: Arc<SystemMonitor>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> Result<Self> {
let announcer = Self {
config,
host_id,
scheduler_client,
system_monitor,
is_running_in_container: is_running_in_container(),
shutdown,
_shutdown_complete: shutdown_complete_tx,
};
announcer
.scheduler_client
.init_announce_host(announcer.make_announce_host_request().await?)
.await?;
Ok(announcer)
}
pub async fn run(&self) {
let mut shutdown = self.shutdown.clone();
let mut interval = tokio::time::interval(self.config.scheduler.announce_interval);
loop {
tokio::select! {
_ = interval.tick() => {
let request = match self.make_announce_host_request().await {
Ok(request) => request,
Err(err) => {
error!("make announce host request failed: {}", err);
continue;
}
};
if let Err(err) = self.scheduler_client.announce_host(request).await {
error!("announce host to scheduler failed: {}", err);
};
}
_ = shutdown.recv() => {
if let Err(err) = self.scheduler_client.delete_host(DeleteHostRequest{
host_id: self.host_id.clone(),
}).await {
error!("delete host from scheduler failed: {}", err);
}
info!("announce to scheduler shutting down");
return
}
}
}
}
#[instrument(skip_all)]
async fn make_announce_host_request(&self) -> Result<AnnounceHostRequest> {
let host_type = if self.config.seed_peer.enable {
self.config.seed_peer.kind
} else {
HostType::Normal
};
let pid = std::process::id();
let cpu_stats = self.system_monitor.cpu.get_stats().await;
let process_cpu_stats = self.system_monitor.cpu.get_process_stats(pid).await;
let mut cpu = Cpu {
logical_count: cpu_stats.logical_core_count,
physical_count: cpu_stats.physical_core_count,
percent: cpu_stats.used_percent,
process_percent: process_cpu_stats.used_percent,
cgroup: None,
times: None,
};
if self.is_running_in_container {
cpu.cgroup = self
.system_monitor
.cpu
.get_cgroup_stats(pid)
.await
.map(|stats| CgroupCpu {
quota: stats.quota,
period: stats.period,
used_percent: stats.used_percent,
});
}
let memory_stats = self.system_monitor.memory.get_stats();
let process_memory_stats = self.system_monitor.memory.get_process_stats(pid);
let mut memory = Memory {
total: memory_stats.total,
free: memory_stats.free,
available: memory_stats.available,
used: memory_stats.usage,
used_percent: memory_stats.used_percent,
process_used_percent: process_memory_stats.used_percent,
cgroup: None,
};
if self.is_running_in_container {
memory.cgroup = self
.system_monitor
.memory
.get_cgroup_stats(pid)
.map(|stats| CgroupMemory {
limit: stats.limit,
usage: stats.usage,
used_percent: stats.used_percent,
});
}
let network_stats = self.system_monitor.network.get_stats().await;
debug!(
"network data: rx bandwidth {}/{} bps, tx bandwidth {}/{} bps",
network_stats.rx_bandwidth.unwrap_or(0),
network_stats.max_rx_bandwidth,
network_stats.tx_bandwidth.unwrap_or(0),
network_stats.max_tx_bandwidth
);
let network = Network {
idc: self.config.host.idc.clone(),
location: self.config.host.location.clone(),
max_rx_bandwidth: network_stats.max_rx_bandwidth,
rx_bandwidth: network_stats.rx_bandwidth,
max_tx_bandwidth: network_stats.max_tx_bandwidth,
tx_bandwidth: network_stats.tx_bandwidth,
..Default::default()
};
let disk_stats = self
.system_monitor
.disk
.get_stats(self.config.storage.dir.as_path())?;
let process_disk_stats = self.system_monitor.disk.get_process_stats(pid).await;
let disk = Disk {
total: disk_stats.total,
free: disk_stats.free,
used: disk_stats.usage,
used_percent: disk_stats.used_percent,
write_bandwidth: process_disk_stats.write_bandwidth,
read_bandwidth: process_disk_stats.read_bandwidth,
inodes_total: 0,
inodes_used: 0,
inodes_free: 0,
inodes_used_percent: 0.0,
};
let build = Build {
git_version: CARGO_PKG_VERSION.to_string(),
git_commit: Some(GIT_COMMIT_SHORT_HASH.to_string()),
go_version: None,
rust_version: Some(CARGO_PKG_RUSTC_VERSION.to_string()),
platform: None,
};
let host = Host {
id: self.host_id.to_string(),
r#type: host_type as u32,
hostname: self.config.host.hostname.clone(),
ip: self.config.host.ip.unwrap().to_string(),
port: self.config.upload.server.port as i32,
download_port: self.config.upload.server.port as i32,
os: env::consts::OS.to_string(),
platform: env::consts::OS.to_string(),
platform_family: env::consts::FAMILY.to_string(),
platform_version: System::os_version().unwrap_or_default(),
kernel_version: System::kernel_version().unwrap_or_default(),
cpu: Some(cpu),
memory: Some(memory),
network: Some(network),
disk: Some(disk),
build: Some(build),
scheduler_cluster_id: self.config.host.scheduler_cluster_id.unwrap_or_default(),
disable_shared: self.config.upload.disable_shared,
proxy_port: self.config.proxy.server.port as i32,
name: INSTANCE_NAME.clone(),
};
Ok(AnnounceHostRequest {
host: Some(host),
interval: Some(
prost_wkt_types::Duration::try_from(self.config.scheduler.announce_interval)
.or_err(ErrorType::ParseError)?,
),
})
}
}