use std::collections::HashMap;
use std::time::Instant;
use actix_web::web::Bytes;
use actix_web::{web, HttpResponse};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
use crate::devices::cpu::power::{CpuPowerBroadcast, CpuPowerSnapshot};
use crate::devices::cpu::{CpuCommand, CpuManagementTasks, RaplResponse};
use crate::error::ZeusdError;
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct CpuReadQuery {
pub cpu_ids: Option<String>,
}
fn parse_cpu_ids(raw: &str) -> Vec<usize> {
raw.split(',')
.filter_map(|part| part.trim().parse().ok())
.collect()
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct GetCumulativeEnergy {
pub cpu_ids: String,
pub cpu: bool,
pub dram: bool,
}
impl From<GetCumulativeEnergy> for CpuCommand {
fn from(request: GetCumulativeEnergy) -> Self {
CpuCommand::GetIndexEnergy {
cpu: request.cpu,
dram: request.dram,
}
}
}
#[actix_web::get("/get_cumulative_energy")]
#[tracing::instrument(
skip(query, device_tasks),
fields(
cpu_ids = %query.cpu_ids,
cpu = %query.cpu,
dram = %query.dram,
)
)]
async fn get_cumulative_energy_handler(
query: web::Query<GetCumulativeEnergy>,
device_tasks: web::Data<CpuManagementTasks>,
) -> Result<HttpResponse, ZeusdError> {
let now = Instant::now();
tracing::info!("Received request");
let cpu_ids = parse_cpu_ids(&query.cpu_ids);
if cpu_ids.is_empty() {
return Ok(HttpResponse::BadRequest().json(serde_json::json!({
"error": "cpu_ids must contain at least one CPU index"
})));
}
let device_count = device_tasks.device_count();
for &id in &cpu_ids {
if id >= device_count {
return Err(ZeusdError::CpuNotFoundError(id));
}
}
let mut handles = Vec::with_capacity(cpu_ids.len());
for &cpu_id in &cpu_ids {
let cmd: CpuCommand = CpuCommand::GetIndexEnergy {
cpu: query.cpu,
dram: query.dram,
};
let tasks = device_tasks.clone();
handles.push(async move { (cpu_id, tasks.send_command_blocking(cpu_id, cmd, now).await) });
}
let results = futures::future::join_all(handles).await;
let mut response_map: HashMap<String, RaplResponse> = HashMap::new();
let mut errors: HashMap<String, String> = HashMap::new();
for (cpu_id, result) in results {
match result {
Ok(measurement) => {
response_map.insert(cpu_id.to_string(), measurement);
}
Err(e) => {
errors.insert(cpu_id.to_string(), e.to_string());
}
}
}
if errors.is_empty() {
Ok(HttpResponse::Ok().json(response_map))
} else {
Ok(HttpResponse::InternalServerError().json(serde_json::json!({
"errors": errors
})))
}
}
fn filter_cpu_snapshot(
snapshot: &CpuPowerSnapshot,
cpu_ids: &Option<Vec<usize>>,
) -> CpuPowerSnapshot {
match cpu_ids {
None => snapshot.clone(),
Some(ids) => CpuPowerSnapshot {
timestamp_ms: snapshot.timestamp_ms,
power_mw: snapshot
.power_mw
.iter()
.filter(|(k, _)| ids.contains(k))
.map(|(&k, v)| (k, v.clone()))
.collect(),
},
}
}
#[actix_web::get("/get_power")]
#[tracing::instrument(skip(broadcast), fields(cpu_ids = ?query.cpu_ids))]
async fn get_cpu_power_handler(
query: web::Query<CpuReadQuery>,
broadcast: web::Data<CpuPowerBroadcast>,
) -> HttpResponse {
tracing::info!("Received request");
let cpu_ids = query.cpu_ids.as_ref().map(|s| parse_cpu_ids(s));
if let Some(ref ids) = cpu_ids {
if let Err(unknown) = broadcast.validate_ids(ids) {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!(
"Unknown CPU indices: {:?}. Available: {:?}",
unknown,
broadcast.valid_ids(),
)
}));
}
}
let _guard = broadcast.add_subscriber();
let snapshot = broadcast.wait_for_fresh().await.unwrap_or_default();
let filtered = filter_cpu_snapshot(&snapshot, &cpu_ids);
HttpResponse::Ok().json(filtered)
}
#[actix_web::get("/stream_power")]
#[tracing::instrument(skip(broadcast), fields(cpu_ids = ?query.cpu_ids))]
async fn cpu_power_stream_handler(
query: web::Query<CpuReadQuery>,
broadcast: web::Data<CpuPowerBroadcast>,
) -> HttpResponse {
tracing::info!("Received request");
let cpu_ids = query.cpu_ids.as_ref().map(|s| parse_cpu_ids(s));
if let Some(ref ids) = cpu_ids {
if let Err(unknown) = broadcast.validate_ids(ids) {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!(
"Unknown CPU indices: {:?}. Available: {:?}",
unknown,
broadcast.valid_ids(),
)
}));
}
}
let guard = broadcast.add_subscriber();
let stream = broadcast.stream().map(move |snapshot| {
let _ = &guard;
let filtered = filter_cpu_snapshot(&snapshot, &cpu_ids);
let json = serde_json::to_string(&filtered).unwrap_or_default();
Ok::<_, actix_web::Error>(Bytes::from(format!("data: {json}\n\n")))
});
HttpResponse::Ok()
.insert_header(("Content-Type", "text/event-stream"))
.insert_header(("Cache-Control", "no-cache"))
.streaming(stream)
}
pub fn cpu_routes(cfg: &mut web::ServiceConfig) {
cfg.service(get_cumulative_energy_handler)
.service(get_cpu_power_handler)
.service(cpu_power_stream_handler);
}