use nvml_wrapper::error::NvmlError;
use once_cell::sync::Lazy;
use paste::paste;
use std::collections::HashSet;
use std::future::Future;
use std::net::TcpListener;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use zeusd::auth::SigningKeyData;
use zeusd::config::ApiGroup;
use zeusd::devices::cpu::power::start_cpu_poller;
use zeusd::devices::cpu::{CpuManagementTasks, CpuManager, PackageInfo};
use zeusd::devices::gpu::power::start_gpu_poller;
use zeusd::devices::gpu::{GpuManagementTasks, GpuManager};
use zeusd::error::ZeusdError;
use zeusd::routes::DiscoveryInfo;
use zeusd::startup::{init_tracing, start_server_tcp, EnabledGroups, ServerState};
pub static NUM_GPUS: u32 = 4;
static NUM_CPUS: usize = 1;
static TRACING: Lazy<()> = Lazy::new(|| {
if std::env::var("TEST_LOG").is_ok() {
init_tracing(std::io::stdout).expect("Failed to initialize tracing");
} else {
init_tracing(std::io::sink).expect("Failed to initialize tracing");
};
});
#[derive(Clone)]
pub struct TestGpu {
persistence_mode_tx: UnboundedSender<bool>,
power_limit_tx: UnboundedSender<u32>,
gpu_locked_clocks_tx: UnboundedSender<(u32, u32)>,
mem_locked_clocks_tx: UnboundedSender<(u32, u32)>,
valid_power_limit_range: (u32, u32),
}
pub struct TestGpuObserver {
persistence_mode_rx: UnboundedReceiver<bool>,
power_limit_rx: UnboundedReceiver<u32>,
gpu_locked_clocks_rx: UnboundedReceiver<(u32, u32)>,
mem_locked_clocks_rx: UnboundedReceiver<(u32, u32)>,
}
impl TestGpu {
fn init() -> Result<(Self, TestGpuObserver), ZeusdError> {
let (persistence_mode_tx, persistence_mode_rx) = tokio::sync::mpsc::unbounded_channel();
let (power_limit_tx, power_limit_rx) = tokio::sync::mpsc::unbounded_channel();
let (gpu_locked_clocks_tx, gpu_locked_clocks_rx) = tokio::sync::mpsc::unbounded_channel();
let (mem_locked_clocks_tx, mem_locked_clocks_rx) = tokio::sync::mpsc::unbounded_channel();
let gpu = TestGpu {
persistence_mode_tx,
power_limit_tx,
gpu_locked_clocks_tx,
mem_locked_clocks_tx,
valid_power_limit_range: (100_000, 300_000),
};
let observer = TestGpuObserver {
persistence_mode_rx,
power_limit_rx,
gpu_locked_clocks_rx,
mem_locked_clocks_rx,
};
Ok((gpu, observer))
}
}
impl GpuManager for TestGpu {
fn device_count() -> Result<u32, ZeusdError> {
Ok(NUM_GPUS)
}
fn set_persistence_mode(&mut self, enabled: bool) -> Result<(), ZeusdError> {
self.persistence_mode_tx.send(enabled).unwrap();
Ok(())
}
fn set_power_management_limit(&mut self, power_limit: u32) -> Result<(), ZeusdError> {
if power_limit < self.valid_power_limit_range.0
|| power_limit > self.valid_power_limit_range.1
{
return Err(ZeusdError::from(NvmlError::InvalidArg));
}
self.power_limit_tx.send(power_limit).unwrap();
Ok(())
}
fn set_gpu_locked_clocks(
&mut self,
min_clock_mhz: u32,
max_clock_mhz: u32,
) -> Result<(), ZeusdError> {
self.gpu_locked_clocks_tx
.send((min_clock_mhz, max_clock_mhz))
.unwrap();
Ok(())
}
fn reset_gpu_locked_clocks(&mut self) -> Result<(), ZeusdError> {
self.gpu_locked_clocks_tx.send((0, 0)).unwrap();
Ok(())
}
fn set_mem_locked_clocks(
&mut self,
min_clock_mhz: u32,
max_clock_mhz: u32,
) -> Result<(), ZeusdError> {
self.mem_locked_clocks_tx
.send((min_clock_mhz, max_clock_mhz))
.unwrap();
Ok(())
}
fn reset_mem_locked_clocks(&mut self) -> Result<(), ZeusdError> {
self.mem_locked_clocks_tx.send((0, 0)).unwrap();
Ok(())
}
fn get_instant_power_mw(&mut self) -> Result<u32, ZeusdError> {
Ok(150_000)
}
fn get_total_energy_consumption(&mut self) -> Result<u64, ZeusdError> {
Ok(500_000)
}
}
pub struct TestCpu {
pub cpu: UnboundedReceiver<u64>,
pub dram: UnboundedReceiver<u64>,
}
pub struct TestCpuInjector {
pub cpu: UnboundedSender<u64>,
pub dram: UnboundedSender<u64>,
}
impl TestCpu {
fn init(_index: usize) -> Result<(Self, TestCpuInjector), ZeusdError> {
let (cpu_sender, cpu_receiver) = tokio::sync::mpsc::unbounded_channel();
let (dram_sender, dram_receiver) = tokio::sync::mpsc::unbounded_channel();
Ok((
TestCpu {
cpu: cpu_receiver,
dram: dram_receiver,
},
TestCpuInjector {
cpu: cpu_sender,
dram: dram_sender,
},
))
}
}
impl CpuManager for TestCpu {
fn device_count() -> Result<usize, ZeusdError> {
Ok(1)
}
fn get_available_fields(
_index: usize,
) -> Result<(Arc<PackageInfo>, Option<Arc<PackageInfo>>), ZeusdError> {
Ok((
Arc::new(PackageInfo {
index: _index,
name: "package-0".to_string(),
energy_uj_path: PathBuf::from(
"/sys/class/powercap/intel-rapl/intel-rapl:0/energy_uj",
),
max_energy_uj: 1000000,
}),
Some(Arc::new(PackageInfo {
index: _index,
name: "dram".to_string(),
energy_uj_path: PathBuf::from(
"/sys/class/powercap/intel-rapl/intel-rapl:0/intel-rapl:0:0/energy_uj",
),
max_energy_uj: 1000000,
})),
))
}
fn get_cpu_energy(&mut self) -> Result<u64, ZeusdError> {
Ok(self.cpu.try_recv().ok().unwrap())
}
fn get_dram_energy(&mut self) -> Result<u64, ZeusdError> {
Ok(self.dram.try_recv().ok().unwrap())
}
fn is_dram_available(&self) -> bool {
true
}
}
pub struct PowerTestCpu {
cpu_energy_uj: u64,
dram_energy_uj: u64,
cpu_increment_uj: u64,
dram_increment_uj: u64,
}
pub const POWER_TEST_CPU_INCREMENT_UJ: u64 = 10_000;
pub const POWER_TEST_DRAM_INCREMENT_UJ: u64 = 5_000;
pub const POWER_TEST_POLL_HZ: u32 = 10;
impl PowerTestCpu {
fn new(cpu_increment_uj: u64, dram_increment_uj: u64) -> Self {
Self {
cpu_energy_uj: 0,
dram_energy_uj: 0,
cpu_increment_uj,
dram_increment_uj,
}
}
}
impl CpuManager for PowerTestCpu {
fn device_count() -> Result<usize, ZeusdError> {
Ok(1)
}
fn get_available_fields(
index: usize,
) -> Result<(Arc<PackageInfo>, Option<Arc<PackageInfo>>), ZeusdError> {
Ok((
Arc::new(PackageInfo {
index,
name: "package-0".to_string(),
energy_uj_path: PathBuf::from(
"/sys/class/powercap/intel-rapl/intel-rapl:0/energy_uj",
),
max_energy_uj: 1_000_000,
}),
Some(Arc::new(PackageInfo {
index,
name: "dram".to_string(),
energy_uj_path: PathBuf::from(
"/sys/class/powercap/intel-rapl/intel-rapl:0/intel-rapl:0:0/energy_uj",
),
max_energy_uj: 1_000_000,
})),
))
}
fn get_cpu_energy(&mut self) -> Result<u64, ZeusdError> {
let val = self.cpu_energy_uj;
self.cpu_energy_uj += self.cpu_increment_uj;
Ok(val)
}
fn get_dram_energy(&mut self) -> Result<u64, ZeusdError> {
let val = self.dram_energy_uj;
self.dram_energy_uj += self.dram_increment_uj;
Ok(val)
}
fn is_dram_available(&self) -> bool {
true
}
}
pub fn start_gpu_test_tasks() -> anyhow::Result<(GpuManagementTasks, Vec<TestGpuObserver>)> {
let mut gpus = Vec::with_capacity(4);
let mut observers = Vec::with_capacity(4);
for _ in 0..4 {
let (gpu, observer) = TestGpu::init()?;
gpus.push(gpu);
observers.push(observer);
}
let tasks = GpuManagementTasks::start(gpus)?;
Ok((tasks, observers))
}
pub fn start_cpu_test_tasks() -> anyhow::Result<(CpuManagementTasks, Vec<TestCpuInjector>)> {
let mut cpus = Vec::with_capacity(NUM_CPUS);
let mut injectors = Vec::with_capacity(NUM_CPUS);
for i in 0..NUM_CPUS {
let (cpu, cpu_injector) = TestCpu::init(i)?;
cpus.push(cpu);
injectors.push(cpu_injector)
}
let tasks = CpuManagementTasks::start(cpus)?;
Ok((tasks, injectors))
}
pub trait ZeusdRequest: serde::Serialize {
fn build_url(app: &TestApp) -> String;
fn http_method() -> reqwest::Method {
reqwest::Method::POST
}
}
macro_rules! impl_zeusd_request_gpu {
($api:ident) => {
paste! {
impl ZeusdRequest for zeusd::routes::gpu::[<$api:camel>] {
fn build_url(app: &TestApp) -> String {
format!(
"http://127.0.0.1:{}/gpu/{}",
app.port, stringify!([<$api:snake>]),
)
}
}
}
};
}
macro_rules! impl_zeusd_request_cpu {
($api:ident) => {
paste! {
impl ZeusdRequest for zeusd::routes::cpu::[<$api:camel>] {
fn build_url(app: &TestApp) -> String {
format!(
"http://127.0.0.1:{}/cpu/{}",
app.port, stringify!([<$api:snake>]),
)
}
fn http_method() -> reqwest::Method {
reqwest::Method::GET
}
}
}
};
}
impl_zeusd_request_gpu!(SetPersistenceMode);
impl_zeusd_request_gpu!(SetPowerLimit);
impl_zeusd_request_gpu!(SetGpuLockedClocks);
impl_zeusd_request_gpu!(ResetGpuLockedClocks);
impl_zeusd_request_gpu!(SetMemLockedClocks);
impl_zeusd_request_gpu!(ResetMemLockedClocks);
impl_zeusd_request_cpu!(GetCumulativeEnergy);
pub struct TestApp {
pub port: u16,
observers: Vec<TestGpuObserver>,
cpu_injectors: Vec<TestCpuInjector>,
}
impl TestApp {
pub async fn start() -> Self {
Self::start_with_groups(&[ApiGroup::GpuControl, ApiGroup::GpuRead, ApiGroup::CpuRead]).await
}
pub async fn start_with_groups(groups: &[ApiGroup]) -> Self {
Lazy::force(&TRACING);
let enabled_groups = EnabledGroups(groups.iter().cloned().collect::<HashSet<_>>());
let needs_gpu =
groups.contains(&ApiGroup::GpuControl) || groups.contains(&ApiGroup::GpuRead);
let needs_gpu_poller = groups.contains(&ApiGroup::GpuRead);
let needs_cpu = groups.contains(&ApiGroup::CpuRead);
let (gpu_test_tasks, test_gpu_observers, gpu_count) = if needs_gpu {
let (tasks, observers) =
start_gpu_test_tasks().expect("Failed to start gpu test tasks");
let count = tasks.device_count();
(Some(tasks), observers, count)
} else {
(None, Vec::new(), 0)
};
let gpu_power_broadcast = if needs_gpu_poller {
let test_power_gpus: Vec<(usize, TestGpu)> = (0..NUM_GPUS as usize)
.map(|i| (i, TestGpu::init().unwrap().0))
.collect();
let gpu_power_poller = start_gpu_poller(test_power_gpus, 10);
Some(gpu_power_poller.broadcast())
} else {
None
};
let (cpu_test_tasks, cpu_test_injectors, cpu_count, dram_available) = if needs_cpu {
let (tasks, injectors) =
start_cpu_test_tasks().expect("Failed to start cpu test tasks");
let count = tasks.device_count();
(Some(tasks), injectors, count, vec![true; count])
} else {
(None, Vec::new(), 0, vec![])
};
let cpu_power_broadcast = if needs_cpu {
let cpu_power_cpus: Vec<(usize, PowerTestCpu)> = (0..NUM_CPUS)
.map(|i| {
(
i,
PowerTestCpu::new(
POWER_TEST_CPU_INCREMENT_UJ,
POWER_TEST_DRAM_INCREMENT_UJ,
),
)
})
.collect();
let cpu_power_poller = start_cpu_poller(cpu_power_cpus, POWER_TEST_POLL_HZ);
Some(cpu_power_poller.broadcast())
} else {
None
};
let discovery_info = DiscoveryInfo {
gpu_ids: (0..gpu_count).collect(),
cpu_ids: (0..cpu_count).collect(),
dram_available,
enabled_api_groups: groups.iter().map(|g| g.to_string()).collect(),
auth_required: false,
};
let state = ServerState {
gpu_device_tasks: gpu_test_tasks,
cpu_device_tasks: cpu_test_tasks,
gpu_power_broadcast,
cpu_power_broadcast,
discovery_info,
enabled_groups,
signing_key: None,
};
let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind TCP listener");
let port = listener.local_addr().unwrap().port();
let server = start_server_tcp(listener, state, 2).expect("Failed to start server");
let _ = tokio::spawn(server);
TestApp {
port,
observers: test_gpu_observers,
cpu_injectors: cpu_test_injectors,
}
}
pub async fn start_with_auth(signing_key: &[u8]) -> Self {
Self::start_with_auth_and_groups(
signing_key,
&[ApiGroup::GpuControl, ApiGroup::GpuRead, ApiGroup::CpuRead],
)
.await
}
pub async fn start_with_auth_and_groups(signing_key: &[u8], groups: &[ApiGroup]) -> Self {
Lazy::force(&TRACING);
let enabled_groups = EnabledGroups(groups.iter().cloned().collect::<HashSet<_>>());
let needs_gpu =
groups.contains(&ApiGroup::GpuControl) || groups.contains(&ApiGroup::GpuRead);
let needs_gpu_poller = groups.contains(&ApiGroup::GpuRead);
let needs_cpu = groups.contains(&ApiGroup::CpuRead);
let (gpu_test_tasks, test_gpu_observers, gpu_count) = if needs_gpu {
let (tasks, observers) =
start_gpu_test_tasks().expect("Failed to start gpu test tasks");
let count = tasks.device_count();
(Some(tasks), observers, count)
} else {
(None, Vec::new(), 0)
};
let gpu_power_broadcast = if needs_gpu_poller {
let test_power_gpus: Vec<(usize, TestGpu)> = (0..NUM_GPUS as usize)
.map(|i| (i, TestGpu::init().unwrap().0))
.collect();
let gpu_power_poller = start_gpu_poller(test_power_gpus, 10);
Some(gpu_power_poller.broadcast())
} else {
None
};
let (cpu_test_tasks, cpu_test_injectors, cpu_count, dram_available) = if needs_cpu {
let (tasks, injectors) =
start_cpu_test_tasks().expect("Failed to start cpu test tasks");
let count = tasks.device_count();
(Some(tasks), injectors, count, vec![true; count])
} else {
(None, Vec::new(), 0, vec![])
};
let cpu_power_broadcast = if needs_cpu {
let cpu_power_cpus: Vec<(usize, PowerTestCpu)> = (0..NUM_CPUS)
.map(|i| {
(
i,
PowerTestCpu::new(
POWER_TEST_CPU_INCREMENT_UJ,
POWER_TEST_DRAM_INCREMENT_UJ,
),
)
})
.collect();
let cpu_power_poller = start_cpu_poller(cpu_power_cpus, POWER_TEST_POLL_HZ);
Some(cpu_power_poller.broadcast())
} else {
None
};
let signing_key_data = SigningKeyData(Arc::new(jsonwebtoken::DecodingKey::from_secret(
signing_key,
)));
let discovery_info = DiscoveryInfo {
gpu_ids: (0..gpu_count).collect(),
cpu_ids: (0..cpu_count).collect(),
dram_available,
enabled_api_groups: groups.iter().map(|g| g.to_string()).collect(),
auth_required: true,
};
let state = ServerState {
gpu_device_tasks: gpu_test_tasks,
cpu_device_tasks: cpu_test_tasks,
gpu_power_broadcast,
cpu_power_broadcast,
discovery_info,
enabled_groups,
signing_key: Some(signing_key_data),
};
let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind TCP listener");
let port = listener.local_addr().unwrap().port();
let server = start_server_tcp(listener, state, 2).expect("Failed to start server");
let _ = tokio::spawn(server);
TestApp {
port,
observers: test_gpu_observers,
cpu_injectors: cpu_test_injectors,
}
}
pub fn send<T: ZeusdRequest>(
&mut self,
payload: T,
) -> impl Future<Output = Result<reqwest::Response, reqwest::Error>> {
let client = reqwest::Client::new();
let url = T::build_url(self);
let method = T::http_method();
client.request(method, url).query(&payload).send()
}
pub fn persistence_mode_history_for_gpu(&mut self, gpu_id: usize) -> Vec<bool> {
let rx = &mut self.observers[gpu_id].persistence_mode_rx;
std::iter::from_fn(|| rx.try_recv().ok()).collect()
}
pub fn power_limit_history_for_gpu(&mut self, gpu_id: usize) -> Vec<u32> {
let rx = &mut self.observers[gpu_id].power_limit_rx;
std::iter::from_fn(|| rx.try_recv().ok()).collect()
}
pub fn gpu_locked_clocks_history_for_gpu(&mut self, gpu_id: usize) -> Vec<(u32, u32)> {
let rx = &mut self.observers[gpu_id].gpu_locked_clocks_rx;
std::iter::from_fn(|| rx.try_recv().ok()).collect()
}
pub fn mem_locked_clocks_history_for_gpu(&mut self, gpu_id: usize) -> Vec<(u32, u32)> {
let rx = &mut self.observers[gpu_id].mem_locked_clocks_rx;
std::iter::from_fn(|| rx.try_recv().ok()).collect()
}
pub fn set_cpu_energy_measurements(&mut self, cpu_id: usize, measurements: &Vec<u64>) {
for measurement in measurements {
self.cpu_injectors[cpu_id].cpu.send(*measurement).unwrap();
}
}
pub fn set_dram_energy_measurements(&mut self, cpu_id: usize, measurements: &Vec<u64>) {
for measurement in measurements {
self.cpu_injectors[cpu_id].dram.send(*measurement).unwrap();
}
}
}