zeusd/devices/cpu/
linux.rs

1//! CPU power measurement with RAPL. Only supported on Linux.
2
3use once_cell::sync::OnceCell;
4use std::fs;
5use std::io::Read;
6use std::path::{Path, PathBuf};
7use std::string::String;
8use std::sync::{Arc, RwLock};
9use tokio::io::AsyncReadExt;
10use tokio::task::JoinHandle;
11use tokio::time::{sleep, Duration};
12
13use crate::devices::cpu::{CpuManager, PackageInfo};
14use crate::error::ZeusdError;
15
16// NOTE: To support Zeusd deployment in a docker container, this should support
17//       sysfs mounts under places like `/zeus_sys`.
18static RAPL_DIR: &str = "/sys/class/powercap/intel-rapl";
19
20// Assuming a maximum power draw of 1000 Watts when we are polling every 0.1 seconds, the maximum
21// amount the RAPL counter would increase (1000 * 1e6 * 0.1)
22static RAPL_COUNTER_MAX_INCREASE: u64 = 1000 * 100000;
23
24pub struct RaplCpu {
25    cpu: Arc<PackageInfo>,
26    dram: Option<Arc<PackageInfo>>,
27    cpu_monitoring_task: OnceCell<JoinHandle<Result<(), ZeusdError>>>,
28    dram_monitoring_task: OnceCell<JoinHandle<Result<(), ZeusdError>>>,
29}
30
31impl RaplCpu {
32    pub fn init(_index: usize) -> Result<Self, ZeusdError> {
33        let fields = RaplCpu::get_available_fields(_index)?;
34        Ok(Self {
35            cpu: fields.0,
36            dram: fields.1,
37            cpu_monitoring_task: OnceCell::new(),
38            dram_monitoring_task: OnceCell::new(),
39        })
40    }
41}
42
43impl PackageInfo {
44    pub fn new(base_path: &Path, index: usize) -> anyhow::Result<Self, ZeusdError> {
45        let cpu_name_path = base_path.join("name");
46        let cpu_energy_path = base_path.join("energy_uj");
47        let cpu_max_energy_path = base_path.join("max_energy_range_uj");
48
49        if !cpu_name_path.exists() || !cpu_max_energy_path.exists() || !cpu_energy_path.exists() {
50            return Err(ZeusdError::CpuInitializationError(index));
51        }
52
53        let cpu_name = fs::read_to_string(&cpu_name_path)?.trim_end().to_string();
54        // Try reding from energy_uj file
55        read_u64(&cpu_energy_path)?;
56        let cpu_max_energy = read_u64(&cpu_max_energy_path)?;
57        let wraparound_counter = RwLock::new(0);
58        Ok(PackageInfo {
59            index,
60            name: cpu_name,
61            energy_uj_path: cpu_energy_path,
62            max_energy_uj: cpu_max_energy,
63            num_wraparounds: wraparound_counter,
64        })
65    }
66}
67
68impl CpuManager for RaplCpu {
69    fn device_count() -> Result<usize, ZeusdError> {
70        let mut index_count = 0;
71        let base_path = PathBuf::from(RAPL_DIR);
72
73        match fs::read_dir(&base_path) {
74            Ok(entries) => {
75                for entry in entries.flatten() {
76                    let path = entry.path();
77                    if path.is_dir() {
78                        if let Some(dir_name_str) = path.file_name() {
79                            let dir_name = dir_name_str.to_string_lossy();
80                            if dir_name.contains("intel-rapl") {
81                                index_count += 1;
82                            }
83                        }
84                    }
85                }
86            }
87            Err(_) => {
88                tracing::error!("RAPL not available");
89            }
90        };
91        Ok(index_count)
92    }
93
94    fn get_available_fields(
95        index: usize,
96    ) -> Result<(Arc<PackageInfo>, Option<Arc<PackageInfo>>), ZeusdError> {
97        let base_path = PathBuf::from(format!("{}/intel-rapl:{}", RAPL_DIR, index));
98        let cpu_info = PackageInfo::new(&base_path, index)?;
99
100        match fs::read_dir(&base_path) {
101            Ok(entries) => {
102                for entry in entries.flatten() {
103                    let path = entry.path();
104                    if path.is_dir() {
105                        if let Some(dir_name_str) = path.file_name() {
106                            let dir_name = dir_name_str.to_string_lossy();
107                            if dir_name.contains("intel-rapl") {
108                                let subpackage_path = base_path.join(&*dir_name);
109                                let subpackage_info = PackageInfo::new(&subpackage_path, index)?;
110                                if subpackage_info.name == "dram" {
111                                    return Ok((
112                                        Arc::new(cpu_info),
113                                        Some(Arc::new(subpackage_info)),
114                                    ));
115                                }
116                            }
117                        }
118                    }
119                }
120            }
121            Err(_) => {
122                return Err(ZeusdError::CpuInitializationError(index));
123            }
124        };
125
126        Ok((Arc::new(cpu_info), None))
127    }
128
129    fn get_cpu_energy(&mut self) -> Result<u64, ZeusdError> {
130        // Assume that RAPL counter will not wrap around twice during a request to poll energy. The
131        // number of wrap arounds is polled twice to handle the case where the counter wraps around
132        // a request. If this happens, `measurement` has to be updated as to not return an
133        // unexpectedly large energy value.
134
135        let handle = self
136            .cpu_monitoring_task
137            .get_or_init(|| tokio::spawn(monitor_rapl(Arc::clone(&self.cpu))));
138        if handle.is_finished() {
139            return Err(ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index));
140        }
141
142        let num_wraparounds_before = *self
143            .cpu
144            .num_wraparounds
145            .read()
146            .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index))?;
147        let mut measurement = read_u64(&self.cpu.energy_uj_path)?;
148        let num_wraparounds = *self
149            .cpu
150            .num_wraparounds
151            .read()
152            .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index))?;
153        if num_wraparounds != num_wraparounds_before {
154            // Wraparound has happened after measurement, take measurement again
155            measurement = read_u64(&self.cpu.energy_uj_path)?;
156        }
157
158        Ok(measurement + num_wraparounds * self.cpu.max_energy_uj)
159    }
160
161    fn get_dram_energy(&mut self) -> Result<u64, ZeusdError> {
162        match &self.dram {
163            None => Err(ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index)),
164            Some(dram) => {
165                let handle = self
166                    .dram_monitoring_task
167                    .get_or_init(|| tokio::spawn(monitor_rapl(Arc::clone(dram))));
168                if handle.is_finished() {
169                    return Err(ZeusdError::CpuManagementTaskTerminatedError(dram.index));
170                }
171
172                let num_wraparounds_before = *dram
173                    .num_wraparounds
174                    .read()
175                    .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(dram.index))?;
176                let mut measurement = read_u64(&dram.energy_uj_path)?;
177                let num_wraparounds = *dram
178                    .num_wraparounds
179                    .read()
180                    .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(dram.index))?;
181                if num_wraparounds != num_wraparounds_before {
182                    // Wraparound has happened after measurement, take measurement again
183                    measurement = read_u64(&dram.energy_uj_path)?;
184                }
185
186                Ok(measurement + num_wraparounds * dram.max_energy_uj)
187            }
188        }
189    }
190
191    fn stop_monitoring(&mut self) {
192        if let Some(handle) = self.cpu_monitoring_task.take() {
193            handle.abort();
194        }
195        if let Some(handle) = self.dram_monitoring_task.take() {
196            handle.abort();
197        }
198    }
199
200    fn is_dram_available(&self) -> bool {
201        self.dram.is_some()
202    }
203}
204
205fn read_u64(path: &PathBuf) -> anyhow::Result<u64, std::io::Error> {
206    let mut file = std::fs::File::open(path)?;
207    let mut buf = String::new();
208    file.read_to_string(&mut buf)?;
209    buf.trim()
210        .parse()
211        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
212}
213
214async fn read_u64_async(path: &PathBuf) -> Result<u64, std::io::Error> {
215    let mut file = tokio::fs::File::open(path).await?;
216    let mut buf = String::new();
217    file.read_to_string(&mut buf).await?;
218    buf.trim()
219        .parse()
220        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
221}
222
223async fn monitor_rapl(rapl_file: Arc<PackageInfo>) -> Result<(), ZeusdError> {
224    let mut last_energy_uj = read_u64_async(&rapl_file.energy_uj_path).await?;
225    tracing::info!(
226        "Monitoring started for {}",
227        rapl_file.energy_uj_path.display()
228    );
229    loop {
230        let current_energy_uj = read_u64_async(&rapl_file.energy_uj_path).await?;
231
232        if current_energy_uj < last_energy_uj {
233            let mut wraparound_guard = rapl_file
234                .num_wraparounds
235                .write()
236                .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(rapl_file.index))?;
237            *wraparound_guard += 1;
238        }
239        last_energy_uj = current_energy_uj;
240        let sleep_time = if rapl_file.max_energy_uj - current_energy_uj < RAPL_COUNTER_MAX_INCREASE
241        {
242            100
243        } else {
244            1000
245        };
246        sleep(Duration::from_millis(sleep_time)).await;
247    }
248}