use std::collections::BTreeMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use serde::Serialize;
use tokio::sync::{watch, Notify};
use tokio::time::{interval, Duration};
use crate::devices::gpu::GpuManager;
use crate::power_streaming::{PowerBroadcast, PowerPoller};
#[derive(Clone, Debug, Default, Serialize)]
pub struct GpuPowerSnapshot {
pub timestamp_ms: u64,
pub power_mw: BTreeMap<usize, u32>,
}
pub type GpuPowerBroadcast = PowerBroadcast<GpuPowerSnapshot>;
pub type GpuPowerPoller = PowerPoller<GpuPowerSnapshot>;
pub fn start_gpu_poller<T: GpuManager + Send + 'static>(
gpus: Vec<(usize, T)>,
poll_hz: u32,
) -> GpuPowerPoller {
let valid_ids = gpus.iter().map(|(idx, _)| *idx).collect();
PowerPoller::start(valid_ids, |tx, subscriber_count, wake| {
gpu_power_poll_task(gpus, tx, poll_hz, subscriber_count, wake)
})
}
async fn gpu_power_poll_task<T: GpuManager>(
mut gpus: Vec<(usize, T)>,
tx: watch::Sender<GpuPowerSnapshot>,
poll_hz: u32,
subscriber_count: Arc<AtomicUsize>,
wake: Arc<Notify>,
) {
if gpus.is_empty() {
tracing::info!("No GPUs to monitor, power poller idle");
std::future::pending::<()>().await;
return;
}
let period_us = 1_000_000u64 / poll_hz.max(1) as u64;
let mut last_power: BTreeMap<usize, u32> = BTreeMap::new();
tracing::info!(
"GPU power poller ready: {} GPUs at {} Hz when subscribers are present",
gpus.len(),
poll_hz
);
loop {
while subscriber_count.load(Ordering::Relaxed) == 0 {
wake.notified().await;
}
tracing::info!("GPU power poller starting");
let mut tick = interval(Duration::from_micros(period_us));
while subscriber_count.load(Ordering::Relaxed) > 0 {
tick.tick().await;
let mut current_power = BTreeMap::new();
let mut changed = false;
for (idx, gpu) in gpus.iter_mut() {
match gpu.get_instant_power_mw() {
Ok(power_mw) => {
if last_power.get(idx) != Some(&power_mw) {
changed = true;
}
current_power.insert(*idx, power_mw);
}
Err(e) => {
tracing::warn!("Failed to read power for GPU {}: {}", idx, e);
if let Some(&last) = last_power.get(idx) {
current_power.insert(*idx, last);
}
}
}
}
if changed || last_power.is_empty() {
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
last_power.clone_from(¤t_power);
let _ = tx.send(GpuPowerSnapshot {
timestamp_ms,
power_mw: current_power,
});
}
}
last_power.clear();
tracing::info!("GPU power poller pausing (no subscribers)");
}
}