zeusd/devices/cpu/
linux.rs1use once_cell::sync::OnceCell;
4use std::fs;
5use std::io::Read;
6use std::path::{Path, PathBuf};
7use std::string::String;
8use std::sync::{Arc, RwLock};
9use tokio::io::AsyncReadExt;
10use tokio::task::JoinHandle;
11use tokio::time::{sleep, Duration};
12
13use crate::devices::cpu::{CpuManager, PackageInfo};
14use crate::error::ZeusdError;
15
16static RAPL_DIR: &str = "/sys/class/powercap/intel-rapl";
19
20static RAPL_COUNTER_MAX_INCREASE: u64 = 1000 * 100000;
23
24pub struct RaplCpu {
25 cpu: Arc<PackageInfo>,
26 dram: Option<Arc<PackageInfo>>,
27 cpu_monitoring_task: OnceCell<JoinHandle<Result<(), ZeusdError>>>,
28 dram_monitoring_task: OnceCell<JoinHandle<Result<(), ZeusdError>>>,
29}
30
31impl RaplCpu {
32 pub fn init(_index: usize) -> Result<Self, ZeusdError> {
33 let fields = RaplCpu::get_available_fields(_index)?;
34 Ok(Self {
35 cpu: fields.0,
36 dram: fields.1,
37 cpu_monitoring_task: OnceCell::new(),
38 dram_monitoring_task: OnceCell::new(),
39 })
40 }
41}
42
43impl PackageInfo {
44 pub fn new(base_path: &Path, index: usize) -> anyhow::Result<Self, ZeusdError> {
45 let cpu_name_path = base_path.join("name");
46 let cpu_energy_path = base_path.join("energy_uj");
47 let cpu_max_energy_path = base_path.join("max_energy_range_uj");
48
49 if !cpu_name_path.exists() || !cpu_max_energy_path.exists() || !cpu_energy_path.exists() {
50 return Err(ZeusdError::CpuInitializationError(index));
51 }
52
53 let cpu_name = fs::read_to_string(&cpu_name_path)?.trim_end().to_string();
54 read_u64(&cpu_energy_path)?;
56 let cpu_max_energy = read_u64(&cpu_max_energy_path)?;
57 let wraparound_counter = RwLock::new(0);
58 Ok(PackageInfo {
59 index,
60 name: cpu_name,
61 energy_uj_path: cpu_energy_path,
62 max_energy_uj: cpu_max_energy,
63 num_wraparounds: wraparound_counter,
64 })
65 }
66}
67
68impl CpuManager for RaplCpu {
69 fn device_count() -> Result<usize, ZeusdError> {
70 let mut index_count = 0;
71 let base_path = PathBuf::from(RAPL_DIR);
72
73 match fs::read_dir(&base_path) {
74 Ok(entries) => {
75 for entry in entries.flatten() {
76 let path = entry.path();
77 if path.is_dir() {
78 if let Some(dir_name_str) = path.file_name() {
79 let dir_name = dir_name_str.to_string_lossy();
80 if dir_name.contains("intel-rapl") {
81 index_count += 1;
82 }
83 }
84 }
85 }
86 }
87 Err(_) => {
88 tracing::error!("RAPL not available");
89 }
90 };
91 Ok(index_count)
92 }
93
94 fn get_available_fields(
95 index: usize,
96 ) -> Result<(Arc<PackageInfo>, Option<Arc<PackageInfo>>), ZeusdError> {
97 let base_path = PathBuf::from(format!("{}/intel-rapl:{}", RAPL_DIR, index));
98 let cpu_info = PackageInfo::new(&base_path, index)?;
99
100 match fs::read_dir(&base_path) {
101 Ok(entries) => {
102 for entry in entries.flatten() {
103 let path = entry.path();
104 if path.is_dir() {
105 if let Some(dir_name_str) = path.file_name() {
106 let dir_name = dir_name_str.to_string_lossy();
107 if dir_name.contains("intel-rapl") {
108 let subpackage_path = base_path.join(&*dir_name);
109 let subpackage_info = PackageInfo::new(&subpackage_path, index)?;
110 if subpackage_info.name == "dram" {
111 return Ok((
112 Arc::new(cpu_info),
113 Some(Arc::new(subpackage_info)),
114 ));
115 }
116 }
117 }
118 }
119 }
120 }
121 Err(_) => {
122 return Err(ZeusdError::CpuInitializationError(index));
123 }
124 };
125
126 Ok((Arc::new(cpu_info), None))
127 }
128
129 fn get_cpu_energy(&mut self) -> Result<u64, ZeusdError> {
130 let handle = self
136 .cpu_monitoring_task
137 .get_or_init(|| tokio::spawn(monitor_rapl(Arc::clone(&self.cpu))));
138 if handle.is_finished() {
139 return Err(ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index));
140 }
141
142 let num_wraparounds_before = *self
143 .cpu
144 .num_wraparounds
145 .read()
146 .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index))?;
147 let mut measurement = read_u64(&self.cpu.energy_uj_path)?;
148 let num_wraparounds = *self
149 .cpu
150 .num_wraparounds
151 .read()
152 .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index))?;
153 if num_wraparounds != num_wraparounds_before {
154 measurement = read_u64(&self.cpu.energy_uj_path)?;
156 }
157
158 Ok(measurement + num_wraparounds * self.cpu.max_energy_uj)
159 }
160
161 fn get_dram_energy(&mut self) -> Result<u64, ZeusdError> {
162 match &self.dram {
163 None => Err(ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index)),
164 Some(dram) => {
165 let handle = self
166 .dram_monitoring_task
167 .get_or_init(|| tokio::spawn(monitor_rapl(Arc::clone(dram))));
168 if handle.is_finished() {
169 return Err(ZeusdError::CpuManagementTaskTerminatedError(dram.index));
170 }
171
172 let num_wraparounds_before = *dram
173 .num_wraparounds
174 .read()
175 .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(dram.index))?;
176 let mut measurement = read_u64(&dram.energy_uj_path)?;
177 let num_wraparounds = *dram
178 .num_wraparounds
179 .read()
180 .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(dram.index))?;
181 if num_wraparounds != num_wraparounds_before {
182 measurement = read_u64(&dram.energy_uj_path)?;
184 }
185
186 Ok(measurement + num_wraparounds * dram.max_energy_uj)
187 }
188 }
189 }
190
191 fn stop_monitoring(&mut self) {
192 if let Some(handle) = self.cpu_monitoring_task.take() {
193 handle.abort();
194 }
195 if let Some(handle) = self.dram_monitoring_task.take() {
196 handle.abort();
197 }
198 }
199
200 fn is_dram_available(&self) -> bool {
201 self.dram.is_some()
202 }
203}
204
205fn read_u64(path: &PathBuf) -> anyhow::Result<u64, std::io::Error> {
206 let mut file = std::fs::File::open(path)?;
207 let mut buf = String::new();
208 file.read_to_string(&mut buf)?;
209 buf.trim()
210 .parse()
211 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
212}
213
214async fn read_u64_async(path: &PathBuf) -> Result<u64, std::io::Error> {
215 let mut file = tokio::fs::File::open(path).await?;
216 let mut buf = String::new();
217 file.read_to_string(&mut buf).await?;
218 buf.trim()
219 .parse()
220 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
221}
222
223async fn monitor_rapl(rapl_file: Arc<PackageInfo>) -> Result<(), ZeusdError> {
224 let mut last_energy_uj = read_u64_async(&rapl_file.energy_uj_path).await?;
225 tracing::info!(
226 "Monitoring started for {}",
227 rapl_file.energy_uj_path.display()
228 );
229 loop {
230 let current_energy_uj = read_u64_async(&rapl_file.energy_uj_path).await?;
231
232 if current_energy_uj < last_energy_uj {
233 let mut wraparound_guard = rapl_file
234 .num_wraparounds
235 .write()
236 .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(rapl_file.index))?;
237 *wraparound_guard += 1;
238 }
239 last_energy_uj = current_energy_uj;
240 let sleep_time = if rapl_file.max_energy_uj - current_energy_uj < RAPL_COUNTER_MAX_INCREASE
241 {
242 100
243 } else {
244 1000
245 };
246 sleep(Duration::from_millis(sleep_time)).await;
247 }
248}