cgroups_rs/systemd/dbus/
client.rs

1// Copyright 2021-2023 Kata Contributors
2// Copyright (c) 2025 Ant Group
3//
4// SPDX-License-Identifier: Apache-2.0 or MIT
5//
6
7use zbus::zvariant::Value;
8use zbus::{Error as ZbusError, Result as ZbusResult};
9
10use crate::systemd::dbus::error::{Error, Result};
11use crate::systemd::dbus::proxy::systemd_manager_proxy;
12use crate::systemd::{Property, NO_SUCH_UNIT, PIDS, UNIT_MODE_REPLACE};
13use crate::CgroupPid;
14
15pub struct SystemdClient<'a> {
16    /// The name of the systemd unit (slice or scope)
17    unit: String,
18    props: Vec<Property<'a>>,
19}
20
21impl<'a> SystemdClient<'a> {
22    pub fn new(unit: &str, props: Vec<Property<'a>>) -> Result<Self> {
23        Ok(Self {
24            unit: unit.to_string(),
25            props,
26        })
27    }
28}
29
30impl SystemdClient<'_> {
31    /// Set the pid to the PIDs property of the unit.
32    ///
33    /// Append a process ID to the PIDs property of the unit. If not
34    /// exists, one property will be created.
35    pub fn set_pid_prop(&mut self, pid: CgroupPid) -> Result<()> {
36        if self.exists() {
37            return Ok(());
38        }
39
40        for prop in self.props.iter_mut() {
41            if prop.0 == PIDS {
42                // If PIDS is already set, we append the new pid to the existing list.
43                if let Value::Array(arr) = &mut prop.1 {
44                    arr.append(pid.pid.into())
45                        .map_err(|_| Error::InvalidProperties)?;
46                    return Ok(());
47                }
48                // Invalid type of PIDs
49                return Err(Error::InvalidProperties);
50            }
51        }
52        // If PIDS is not set, we create a new property.
53        self.props
54            .push((PIDS, Value::Array(vec![pid.pid as u32].into())));
55        Ok(())
56    }
57
58    /// Start a slice or a scope unit controlled and supervised by systemd.
59    ///
60    /// For more information, see:
61    /// https://www.freedesktop.org/software/systemd/man/latest/systemd.unit.html
62    /// https://www.freedesktop.org/software/systemd/man/latest/systemd.slice.html
63    /// https://www.freedesktop.org/software/systemd/man/latest/systemd.scope.html
64    pub fn start(&self) -> Result<()> {
65        // PIDs property must be present
66        if !self.props.iter().any(|(k, _)| k == &PIDS) {
67            return Err(Error::InvalidProperties);
68        }
69
70        let sys_proxy = systemd_manager_proxy()?;
71
72        let props_borrowed: Vec<(&str, &zbus::zvariant::Value)> =
73            self.props.iter().map(|(k, v)| (*k, v)).collect();
74        let props_borrowed: Vec<&(&str, &Value)> = props_borrowed.iter().collect();
75
76        sys_proxy.start_transient_unit(&self.unit, UNIT_MODE_REPLACE, &props_borrowed, &[])?;
77
78        Ok(())
79    }
80
81    /// Stop the current transient unit, the processes will be killed on
82    /// unit stop, see [1].
83    ///
84    /// 1. https://www.freedesktop.org/software/systemd/man/latest/systemd.kill.html#KillMode=
85    pub fn stop(&self) -> Result<()> {
86        let sys_proxy = systemd_manager_proxy()?;
87
88        let ret = sys_proxy.stop_unit(&self.unit, UNIT_MODE_REPLACE);
89        ignore_no_such_unit(ret)?;
90
91        // If we stop the unit and it still exists, it may be in a failed
92        // state, so we will try to reset it.
93        if self.exists() {
94            let ret = sys_proxy.reset_failed_unit(&self.unit);
95            ignore_no_such_unit(ret)?;
96        }
97
98        Ok(())
99    }
100
101    /// Set properties for the unit through dbus `SetUnitProperties`.
102    pub fn set_properties(&mut self, properties: &[Property<'static>]) -> Result<()> {
103        for prop in properties {
104            let new = prop.1.try_clone().map_err(|_| Error::InvalidProperties)?;
105            // Try to update the value first, if fails, append it.
106            if let Some(existing) = self.props.iter_mut().find(|p| p.0 == prop.0) {
107                existing.1 = new;
108            } else {
109                self.props.push((prop.0, new));
110            }
111        }
112
113        // The unit must exist before setting properties.
114        if !self.exists() {
115            return Ok(());
116        }
117
118        let sys_proxy = systemd_manager_proxy()?;
119
120        let props_borrowed: Vec<(&str, &Value)> = properties.iter().map(|(k, v)| (*k, v)).collect();
121        let props_borrowed: Vec<&(&str, &Value)> = props_borrowed.iter().collect();
122
123        sys_proxy.set_unit_properties(&self.unit, true, &props_borrowed)?;
124
125        Ok(())
126    }
127
128    /// Freeze the unit through dbus `FreezeUnit`.
129    pub fn freeze(&self) -> Result<()> {
130        let sys_proxy = systemd_manager_proxy()?;
131
132        sys_proxy.freeze_unit(&self.unit)?;
133
134        Ok(())
135    }
136
137    /// Thaw the frozen unit through dbus `ThawUnit`.
138    pub fn thaw(&self) -> Result<()> {
139        let sys_proxy = systemd_manager_proxy()?;
140
141        sys_proxy.thaw_unit(&self.unit)?;
142
143        Ok(())
144    }
145
146    /// Check if the unit exists.
147    pub fn exists(&self) -> bool {
148        let sys_proxy = match systemd_manager_proxy() {
149            Ok(proxy) => proxy,
150            _ => return false,
151        };
152
153        sys_proxy
154            .get_unit(&self.unit)
155            .map(|_| true)
156            .unwrap_or_default()
157    }
158
159    /// Add a process (tgid) to the unit through dbus
160    /// `AttachProcessesToUnit`.
161    pub fn add_process(&self, pid: CgroupPid, subcgroup: &str) -> Result<()> {
162        let sys_proxy = systemd_manager_proxy()?;
163
164        sys_proxy.attach_processes_to_unit(&self.unit, subcgroup, &[pid.pid as u32])?;
165
166        Ok(())
167    }
168}
169
170fn ignore_no_such_unit<T>(result: ZbusResult<T>) -> ZbusResult<bool> {
171    if let Err(ZbusError::MethodError(err_name, _, _)) = &result {
172        if err_name.as_str() == NO_SUCH_UNIT {
173            return Ok(true);
174        }
175    }
176    result.map(|_| false)
177}
178
179#[cfg(test)]
180pub mod tests {
181    //! Unit tests for the SystemdClient
182    //!
183    //! Not sure why the tests are going to fail if we run them in
184    //! parallel. Everything goes smoothly in serial.
185    //!
186    //! $ cargo test --package cgroups-rs --lib \
187    //!   -- systemd::dbus::client::tests \
188    //!   --show-output --test-threads=1
189
190    use std::fs;
191    use std::path::Path;
192    use std::process::Command;
193    use std::thread::sleep;
194    use std::time::Duration;
195
196    use rand::distributions::Alphanumeric;
197    use rand::Rng;
198
199    use crate::fs::hierarchies;
200    use crate::systemd::dbus::client::*;
201    use crate::systemd::props::PropertiesBuilder;
202    use crate::systemd::utils::expand_slice;
203    use crate::systemd::{DEFAULT_DESCRIPTION, DESCRIPTION, PIDS};
204    use crate::tests::{spawn_sleep_inf, spawn_yes};
205
206    const TEST_SLICE: &str = "cgroupsrs-test.slice";
207
208    fn test_unit() -> String {
209        let rand_string: String = rand::thread_rng()
210            .sample_iter(&Alphanumeric)
211            .take(5)
212            .map(char::from)
213            .collect();
214        format!("cri-pod{}.scope", rand_string)
215    }
216
217    #[macro_export]
218    macro_rules! skip_if_no_systemd {
219        () => {
220            if $crate::tests::systemd_version().is_none() {
221                eprintln!("Test skipped, no systemd?");
222                return;
223            }
224        };
225    }
226
227    fn systemd_show(unit: &str) -> String {
228        let output = Command::new("systemctl")
229            .arg("show")
230            .arg(unit)
231            .output()
232            .expect("Failed to execute systemctl show command");
233        String::from_utf8_lossy(&output.stdout).to_string()
234    }
235
236    fn start_default_cgroup(pid: CgroupPid, unit: &'_ str) -> SystemdClient<'_> {
237        let mut props = PropertiesBuilder::default_cgroup(TEST_SLICE, unit).build();
238        props.push((PIDS, Value::Array(vec![pid.pid as u32].into())));
239        let cgroup = SystemdClient::new(unit, props).unwrap();
240        // Stop the unit if it exists.
241        cgroup.stop().unwrap();
242
243        // Write the current process to the cgroup.
244        cgroup.start().unwrap();
245        cgroup.add_process(pid, "/").unwrap();
246        cgroup
247    }
248
249    fn stop_cgroup(cgroup: &SystemdClient) {
250        cgroup.stop().unwrap();
251    }
252
253    #[test]
254    fn test_start() {
255        skip_if_no_systemd!();
256
257        let v2 = hierarchies::is_cgroup2_unified_mode();
258        let unit = test_unit();
259        let mut child = spawn_sleep_inf();
260        let cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
261
262        let base = expand_slice(TEST_SLICE).unwrap();
263
264        // Check if the cgroup exists in the filesystem
265        let full_base = if v2 {
266            format!("/sys/fs/cgroup/{}", base)
267        } else {
268            format!("/sys/fs/cgroup/memory/{}", base)
269        };
270        assert!(
271            Path::new(&full_base).exists(),
272            "Cgroup base path does not exist: {}",
273            full_base
274        );
275
276        // PIDs
277        let cgroup_procs_path = format!("{}/{}/cgroup.procs", full_base, &unit);
278        for i in 0..5 {
279            let content = fs::read_to_string(&cgroup_procs_path);
280            if let Ok(content) = &content {
281                if content.contains(&child.id().to_string()) {
282                    break;
283                }
284            }
285            // Retry attempts exhausted, resulting in failure
286            if i == 4 {
287                let content = content.as_ref().unwrap();
288                assert!(
289                    content.contains(&child.id().to_string()),
290                    "Cgroup procs does not contain the child process ID"
291                );
292            }
293            // Wait 500ms before next retrying
294            sleep(Duration::from_millis(500));
295        }
296
297        // Check the unit from "systemctl show <unit>"
298        let output = systemd_show(&cgroup.unit);
299
300        // Slice
301        assert!(
302            output
303                .lines()
304                .any(|line| line == format!("Slice={}", TEST_SLICE)),
305            "Slice not found"
306        );
307        // Delegate
308        assert!(
309            output.lines().any(|line| line == "Delegate=yes"),
310            "Delegate not set"
311        );
312        // DelegateControllers
313        // controllers: cpu cpuacct cpuset io blkio memory devices pids
314        let controllers = output
315            .lines()
316            .find(|line| line.starts_with("DelegateControllers="))
317            .map(|line| line.trim_start_matches("DelegateControllers="))
318            .unwrap();
319        let controllers = controllers.split(' ').collect::<Vec<&str>>();
320        assert!(
321            controllers.contains(&"cpu"),
322            "DelegateControllers cpu not set"
323        );
324        assert!(
325            controllers.contains(&"cpuset"),
326            "DelegateControllers cpuset not set"
327        );
328        if v2 {
329            assert!(
330                controllers.contains(&"io"),
331                "DelegateControllers io not set"
332            );
333        } else {
334            assert!(
335                controllers.contains(&"blkio"),
336                "DelegateControllers blkio not set"
337            );
338        }
339        assert!(
340            controllers.contains(&"memory"),
341            "DelegateControllers memory not set"
342        );
343        assert!(
344            controllers.contains(&"pids"),
345            "DelegateControllers pids not set"
346        );
347
348        // CPUAccounting
349        assert!(
350            output.lines().any(|line| line == "CPUAccounting=yes"),
351            "CPUAccounting not set"
352        );
353        // IOAccounting for v2, and BlockIOAccounting for v1
354        if v2 {
355            assert!(
356                output.lines().any(|line| line == "IOAccounting=yes"),
357                "IOAccounting not set"
358            );
359        } else {
360            assert!(
361                output.lines().any(|line| line == "BlockIOAccounting=yes"),
362                "BlockIOAccounting not set"
363            );
364        }
365        // MemoryAccounting
366        assert!(
367            output.lines().any(|line| line == "MemoryAccounting=yes"),
368            "MemoryAccounting not set"
369        );
370        // TasksAccounting
371        assert!(
372            output.lines().any(|line| line == "TasksAccounting=yes"),
373            "TasksAccounting not set"
374        );
375        // ActiveState
376        assert!(
377            output.lines().any(|line| line == "ActiveState=active"),
378            "Unit is not active"
379        );
380
381        stop_cgroup(&cgroup);
382        child.wait().unwrap();
383    }
384
385    #[test]
386    fn test_stop() {
387        skip_if_no_systemd!();
388
389        let unit = test_unit();
390        let mut child = spawn_sleep_inf();
391        let cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
392
393        // Check ActiveState: expected to be "active"
394        let output = systemd_show(&cgroup.unit);
395        assert!(
396            output.lines().any(|line| line == "ActiveState=active"),
397            "Unit is not active"
398        );
399
400        stop_cgroup(&cgroup);
401
402        // Check ActiveState: expected to be "inactive"
403        let output = systemd_show(&cgroup.unit);
404        assert!(
405            output.lines().any(|line| line == "ActiveState=inactive"),
406            "Unit is not inactive"
407        );
408
409        child.wait().unwrap();
410    }
411
412    #[test]
413    fn test_set_properties() {
414        skip_if_no_systemd!();
415
416        let unit = test_unit();
417        let mut child = spawn_sleep_inf();
418        let mut cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
419
420        let output = systemd_show(&cgroup.unit);
421        assert!(
422            output.lines().any(|line| line
423                == format!(
424                    "Description={} {}:{}",
425                    DEFAULT_DESCRIPTION, TEST_SLICE, unit
426                )),
427            "Initial description not set correctly"
428        );
429
430        let properties = [(
431            DESCRIPTION,
432            Value::Str("kata-container1 description".into()),
433        )];
434        cgroup.set_properties(&properties).unwrap();
435        assert!(cgroup.props.iter().any(|(k, v)| {
436            k == &DESCRIPTION && v == &Value::Str("kata-container1 description".into())
437        }));
438
439        let output = systemd_show(&cgroup.unit);
440        assert!(
441            output
442                .lines()
443                .any(|line| line == "Description=kata-container1 description"),
444            "Updated description not set correctly"
445        );
446
447        stop_cgroup(&cgroup);
448        child.wait().unwrap();
449    }
450
451    #[test]
452    fn test_freeze_and_thaw() {
453        skip_if_no_systemd!();
454
455        let unit = test_unit();
456        let mut child = spawn_yes();
457        let cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
458
459        // Freeze the unit
460        cgroup.freeze().unwrap();
461
462        let pid = child.id() as u64;
463
464        let stat_path = format!("/proc/{}/stat", pid);
465        let content = fs::read_to_string(&stat_path).unwrap();
466        // The process state is the third field, e.g.:
467        // 1234 (bash) S 1233 ...
468        //             ^
469        let mut content_iter = content.split_whitespace();
470        assert_eq!(
471            content_iter.nth(2).unwrap(),
472            "S",
473            "Process should be in 'S' (sleeping) state after freezing"
474        );
475
476        // Thaw the unit
477        cgroup.thaw().unwrap();
478
479        // No more S now
480        let content = fs::read_to_string(&stat_path).unwrap();
481        let mut content_iter = content.split_whitespace();
482        assert_ne!(
483            content_iter.nth(2).unwrap(),
484            "S",
485            "Process should not be in 'S' (sleeping) state after thawing"
486        );
487
488        stop_cgroup(&cgroup);
489        child.wait().unwrap();
490    }
491
492    #[test]
493    fn test_exists() {
494        skip_if_no_systemd!();
495
496        let unit = test_unit();
497        let mut child = spawn_sleep_inf();
498        let cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
499
500        assert!(cgroup.exists(), "Cgroup should exist after starting");
501
502        stop_cgroup(&cgroup);
503        child.wait().unwrap();
504    }
505
506    #[test]
507    fn test_add_process() {
508        skip_if_no_systemd!();
509
510        let unit = test_unit();
511        let mut child = spawn_sleep_inf();
512        let cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
513
514        let mut child1 = spawn_sleep_inf();
515        let pid1 = CgroupPid::from(child1.id() as u64);
516        cgroup.add_process(pid1, "/").unwrap();
517
518        let cgroup_procs_path = format!(
519            "/sys/fs/cgroup/{}/{}/cgroup.procs",
520            expand_slice(TEST_SLICE).unwrap(),
521            unit
522        );
523        for i in 0..5 {
524            let content = fs::read_to_string(&cgroup_procs_path);
525            if let Ok(content) = content {
526                assert!(
527                    content.contains(&child1.id().to_string()),
528                    "Cgroup procs does not contain the child1 process ID"
529                );
530                break;
531            }
532            // Retry attempts exhausted, resulting in failure
533            if i == 4 {
534                content.unwrap();
535            }
536            // Wait 500ms before next retrying
537            sleep(Duration::from_millis(500));
538        }
539
540        stop_cgroup(&cgroup);
541        child.wait().unwrap();
542        child1.wait().unwrap();
543    }
544}