cgroups_rs/systemd/dbus/
client.rs

1// Copyright 2021-2023 Kata Contributors
2// Copyright (c) 2025 Ant Group
3//
4// SPDX-License-Identifier: Apache-2.0 or MIT
5//
6
7use zbus::zvariant::Value;
8use zbus::{Error as ZbusError, Result as ZbusResult};
9
10use crate::systemd::dbus::error::{Error, Result};
11use crate::systemd::dbus::proxy::systemd_manager_proxy;
12use crate::systemd::{Property, NO_SUCH_UNIT, PIDS, UNIT_MODE_REPLACE};
13use crate::CgroupPid;
14
15pub struct SystemdClient<'a> {
16    /// The name of the systemd unit (slice or scope)
17    unit: String,
18    props: Vec<Property<'a>>,
19}
20
21impl<'a> SystemdClient<'a> {
22    pub fn new(unit: &str, props: Vec<Property<'a>>) -> Result<Self> {
23        Ok(Self {
24            unit: unit.to_string(),
25            props,
26        })
27    }
28}
29
30impl SystemdClient<'_> {
31    /// Set the pid to the PIDs property of the unit.
32    ///
33    /// Append a process ID to the PIDs property of the unit. If not
34    /// exists, one property will be created.
35    pub fn set_pid_prop(&mut self, pid: CgroupPid) -> Result<()> {
36        if self.exists() {
37            return Ok(());
38        }
39
40        for prop in self.props.iter_mut() {
41            if prop.0 == PIDS {
42                // If PIDS is already set, we append the new pid to the existing list.
43                if let Value::Array(arr) = &mut prop.1 {
44                    arr.append(pid.pid.into())
45                        .map_err(|_| Error::InvalidProperties)?;
46                    return Ok(());
47                }
48                // Invalid type of PIDs
49                return Err(Error::InvalidProperties);
50            }
51        }
52        // If PIDS is not set, we create a new property.
53        self.props
54            .push((PIDS, Value::Array(vec![pid.pid as u32].into())));
55        Ok(())
56    }
57
58    /// Start a slice or a scope unit controlled and supervised by systemd.
59    ///
60    /// For more information, see:
61    /// https://www.freedesktop.org/software/systemd/man/latest/systemd.unit.html
62    /// https://www.freedesktop.org/software/systemd/man/latest/systemd.slice.html
63    /// https://www.freedesktop.org/software/systemd/man/latest/systemd.scope.html
64    pub fn start(&self) -> Result<()> {
65        // PIDs property must be present
66        if !self.props.iter().any(|(k, _)| k == &PIDS) {
67            return Err(Error::InvalidProperties);
68        }
69
70        let sys_proxy = systemd_manager_proxy()?;
71
72        let props_borrowed: Vec<(&str, &zbus::zvariant::Value)> =
73            self.props.iter().map(|(k, v)| (*k, v)).collect();
74        let props_borrowed: Vec<&(&str, &Value)> = props_borrowed.iter().collect();
75
76        sys_proxy.start_transient_unit(&self.unit, UNIT_MODE_REPLACE, &props_borrowed, &[])?;
77
78        Ok(())
79    }
80
81    /// Stop the current transient unit, the processes will be killed on
82    /// unit stop, see [1].
83    ///
84    /// 1. https://www.freedesktop.org/software/systemd/man/latest/systemd.kill.html#KillMode=
85    pub fn stop(&self) -> Result<()> {
86        let sys_proxy = systemd_manager_proxy()?;
87
88        let ret = sys_proxy.stop_unit(&self.unit, UNIT_MODE_REPLACE);
89        ignore_no_such_unit(ret)?;
90
91        // If we stop the unit and it still exists, it may be in a failed
92        // state, so we will try to reset it.
93        if self.exists() {
94            let ret = sys_proxy.reset_failed_unit(&self.unit);
95            ignore_no_such_unit(ret)?;
96        }
97
98        Ok(())
99    }
100
101    /// Set properties for the unit through dbus `SetUnitProperties`.
102    pub fn set_properties(&mut self, properties: &[Property<'static>]) -> Result<()> {
103        for prop in properties {
104            let new = prop.1.try_clone().map_err(|_| Error::InvalidProperties)?;
105            // Try to update the value first, if fails, append it.
106            if let Some(existing) = self.props.iter_mut().find(|p| p.0 == prop.0) {
107                existing.1 = new;
108            } else {
109                self.props.push((prop.0, new));
110            }
111        }
112
113        // The unit must exist before setting properties.
114        if !self.exists() {
115            return Ok(());
116        }
117
118        let sys_proxy = systemd_manager_proxy()?;
119
120        let props_borrowed: Vec<(&str, &Value)> = properties.iter().map(|(k, v)| (*k, v)).collect();
121        let props_borrowed: Vec<&(&str, &Value)> = props_borrowed.iter().collect();
122
123        sys_proxy.set_unit_properties(&self.unit, true, &props_borrowed)?;
124
125        Ok(())
126    }
127
128    /// Freeze the unit through dbus `FreezeUnit`.
129    pub fn freeze(&self) -> Result<()> {
130        let sys_proxy = systemd_manager_proxy()?;
131
132        sys_proxy.freeze_unit(&self.unit)?;
133
134        Ok(())
135    }
136
137    /// Thaw the frozen unit through dbus `ThawUnit`.
138    pub fn thaw(&self) -> Result<()> {
139        let sys_proxy = systemd_manager_proxy()?;
140
141        sys_proxy.thaw_unit(&self.unit)?;
142
143        Ok(())
144    }
145
146    /// Get the systemd version.
147    pub fn systemd_version(&self) -> Result<usize> {
148        let sys_proxy = systemd_manager_proxy()?;
149
150        // Parse 249 from "249.11-0ubuntu3.16"
151        let version = sys_proxy.version()?;
152        let version = version
153            .split('.')
154            .next()
155            .and_then(|v| v.parse::<usize>().ok())
156            .ok_or(Error::CorruptedSystemdVersion(version))?;
157
158        Ok(version)
159    }
160
161    /// Check if the unit exists.
162    pub fn exists(&self) -> bool {
163        let sys_proxy = match systemd_manager_proxy() {
164            Ok(proxy) => proxy,
165            _ => return false,
166        };
167
168        sys_proxy
169            .get_unit(&self.unit)
170            .map(|_| true)
171            .unwrap_or_default()
172    }
173
174    /// Add a process (tgid) to the unit through dbus
175    /// `AttachProcessesToUnit`.
176    pub fn add_process(&self, pid: CgroupPid, subcgroup: &str) -> Result<()> {
177        let sys_proxy = systemd_manager_proxy()?;
178
179        sys_proxy.attach_processes_to_unit(&self.unit, subcgroup, &[pid.pid as u32])?;
180
181        Ok(())
182    }
183}
184
185fn ignore_no_such_unit<T>(result: ZbusResult<T>) -> ZbusResult<bool> {
186    if let Err(ZbusError::MethodError(err_name, _, _)) = &result {
187        if err_name.as_str() == NO_SUCH_UNIT {
188            return Ok(true);
189        }
190    }
191    result.map(|_| false)
192}
193
194#[cfg(test)]
195pub mod tests {
196    //! Unit tests for the SystemdClient
197    //!
198    //! Not sure why the tests are going to fail if we run them in
199    //! parallel. Everything goes smoothly in serial.
200    //!
201    //! $ cargo test --package cgroups-rs --lib \
202    //!   -- systemd::dbus::client::tests \
203    //!   --show-output --test-threads=1
204
205    use std::fs;
206    use std::path::Path;
207    use std::process::Command;
208    use std::thread::sleep;
209    use std::time::Duration;
210
211    use rand::distributions::Alphanumeric;
212    use rand::Rng;
213
214    use crate::fs::hierarchies;
215    use crate::systemd::dbus::client::*;
216    use crate::systemd::props::PropertiesBuilder;
217    use crate::systemd::utils::expand_slice;
218    use crate::systemd::{DEFAULT_DESCRIPTION, DESCRIPTION, PIDS};
219    use crate::tests::{spawn_sleep_inf, spawn_yes, systemd_version};
220
221    const TEST_SLICE: &str = "cgroupsrs-test.slice";
222
223    fn test_unit() -> String {
224        let rand_string: String = rand::thread_rng()
225            .sample_iter(&Alphanumeric)
226            .take(5)
227            .map(char::from)
228            .collect();
229        format!("cri-pod{}.scope", rand_string)
230    }
231
232    #[macro_export]
233    macro_rules! skip_if_no_systemd {
234        () => {
235            if $crate::tests::systemd_version().is_none() {
236                eprintln!("Test skipped, no systemd?");
237                return;
238            }
239        };
240    }
241
242    fn systemd_show(unit: &str) -> String {
243        let output = Command::new("systemctl")
244            .arg("show")
245            .arg(unit)
246            .output()
247            .expect("Failed to execute systemctl show command");
248        String::from_utf8_lossy(&output.stdout).to_string()
249    }
250
251    fn start_default_cgroup(pid: CgroupPid, unit: &str) -> SystemdClient {
252        let mut props = PropertiesBuilder::default_cgroup(TEST_SLICE, unit).build();
253        props.push((PIDS, Value::Array(vec![pid.pid as u32].into())));
254        let cgroup = SystemdClient::new(unit, props).unwrap();
255        // Stop the unit if it exists.
256        cgroup.stop().unwrap();
257
258        // Write the current process to the cgroup.
259        cgroup.start().unwrap();
260        cgroup.add_process(pid, "/").unwrap();
261        cgroup
262    }
263
264    fn stop_cgroup(cgroup: &SystemdClient) {
265        cgroup.stop().unwrap();
266    }
267
268    #[test]
269    fn test_start() {
270        skip_if_no_systemd!();
271
272        let v2 = hierarchies::is_cgroup2_unified_mode();
273        let unit = test_unit();
274        let mut child = spawn_sleep_inf();
275        let cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
276
277        let base = expand_slice(TEST_SLICE).unwrap();
278
279        // Check if the cgroup exists in the filesystem
280        let full_base = if v2 {
281            format!("/sys/fs/cgroup/{}", base)
282        } else {
283            format!("/sys/fs/cgroup/memory/{}", base)
284        };
285        assert!(
286            Path::new(&full_base).exists(),
287            "Cgroup base path does not exist: {}",
288            full_base
289        );
290
291        // PIDs
292        let cgroup_procs_path = format!("{}/{}/cgroup.procs", full_base, &unit);
293        for i in 0..5 {
294            let content = fs::read_to_string(&cgroup_procs_path);
295            if let Ok(content) = &content {
296                if content.contains(&child.id().to_string()) {
297                    break;
298                }
299            }
300            // Retry attempts exhausted, resulting in failure
301            if i == 4 {
302                let content = content.as_ref().unwrap();
303                assert!(
304                    content.contains(&child.id().to_string()),
305                    "Cgroup procs does not contain the child process ID"
306                );
307            }
308            // Wait 500ms before next retrying
309            sleep(Duration::from_millis(500));
310        }
311
312        // Check the unit from "systemctl show <unit>"
313        let output = systemd_show(&cgroup.unit);
314
315        // Slice
316        assert!(
317            output
318                .lines()
319                .any(|line| line == format!("Slice={}", TEST_SLICE)),
320            "Slice not found"
321        );
322        // Delegate
323        assert!(
324            output.lines().any(|line| line == "Delegate=yes"),
325            "Delegate not set"
326        );
327        // DelegateControllers
328        // controllers: cpu cpuacct cpuset io blkio memory devices pids
329        let controllers = output
330            .lines()
331            .find(|line| line.starts_with("DelegateControllers="))
332            .map(|line| line.trim_start_matches("DelegateControllers="))
333            .unwrap();
334        let controllers = controllers.split(' ').collect::<Vec<&str>>();
335        assert!(
336            controllers.contains(&"cpu"),
337            "DelegateControllers cpu not set"
338        );
339        assert!(
340            controllers.contains(&"cpuset"),
341            "DelegateControllers cpuset not set"
342        );
343        if v2 {
344            assert!(
345                controllers.contains(&"io"),
346                "DelegateControllers io not set"
347            );
348        } else {
349            assert!(
350                controllers.contains(&"blkio"),
351                "DelegateControllers blkio not set"
352            );
353        }
354        assert!(
355            controllers.contains(&"memory"),
356            "DelegateControllers memory not set"
357        );
358        assert!(
359            controllers.contains(&"pids"),
360            "DelegateControllers pids not set"
361        );
362
363        // CPUAccounting
364        assert!(
365            output.lines().any(|line| line == "CPUAccounting=yes"),
366            "CPUAccounting not set"
367        );
368        // IOAccounting for v2, and BlockIOAccounting for v1
369        if v2 {
370            assert!(
371                output.lines().any(|line| line == "IOAccounting=yes"),
372                "IOAccounting not set"
373            );
374        } else {
375            assert!(
376                output.lines().any(|line| line == "BlockIOAccounting=yes"),
377                "BlockIOAccounting not set"
378            );
379        }
380        // MemoryAccounting
381        assert!(
382            output.lines().any(|line| line == "MemoryAccounting=yes"),
383            "MemoryAccounting not set"
384        );
385        // TasksAccounting
386        assert!(
387            output.lines().any(|line| line == "TasksAccounting=yes"),
388            "TasksAccounting not set"
389        );
390        // ActiveState
391        assert!(
392            output.lines().any(|line| line == "ActiveState=active"),
393            "Unit is not active"
394        );
395
396        stop_cgroup(&cgroup);
397        child.wait().unwrap();
398    }
399
400    #[test]
401    fn test_stop() {
402        skip_if_no_systemd!();
403
404        let unit = test_unit();
405        let mut child = spawn_sleep_inf();
406        let cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
407
408        // Check ActiveState: expected to be "active"
409        let output = systemd_show(&cgroup.unit);
410        assert!(
411            output.lines().any(|line| line == "ActiveState=active"),
412            "Unit is not active"
413        );
414
415        stop_cgroup(&cgroup);
416
417        // Check ActiveState: expected to be "inactive"
418        let output = systemd_show(&cgroup.unit);
419        assert!(
420            output.lines().any(|line| line == "ActiveState=inactive"),
421            "Unit is not inactive"
422        );
423
424        child.wait().unwrap();
425    }
426
427    #[test]
428    fn test_set_properties() {
429        skip_if_no_systemd!();
430
431        let unit = test_unit();
432        let mut child = spawn_sleep_inf();
433        let mut cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
434
435        let output = systemd_show(&cgroup.unit);
436        assert!(
437            output.lines().any(|line| line
438                == format!(
439                    "Description={} {}:{}",
440                    DEFAULT_DESCRIPTION, TEST_SLICE, unit
441                )),
442            "Initial description not set correctly"
443        );
444
445        let properties = [(
446            DESCRIPTION,
447            Value::Str("kata-container1 description".into()),
448        )];
449        cgroup.set_properties(&properties).unwrap();
450        assert!(cgroup.props.iter().any(|(k, v)| {
451            k == &DESCRIPTION && v == &Value::Str("kata-container1 description".into())
452        }));
453
454        let output = systemd_show(&cgroup.unit);
455        assert!(
456            output
457                .lines()
458                .any(|line| line == "Description=kata-container1 description"),
459            "Updated description not set correctly"
460        );
461
462        stop_cgroup(&cgroup);
463        child.wait().unwrap();
464    }
465
466    #[test]
467    fn test_freeze_and_thaw() {
468        skip_if_no_systemd!();
469
470        let unit = test_unit();
471        let mut child = spawn_yes();
472        let cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
473
474        // Freeze the unit
475        cgroup.freeze().unwrap();
476
477        let pid = child.id() as u64;
478
479        let stat_path = format!("/proc/{}/stat", pid);
480        let content = fs::read_to_string(&stat_path).unwrap();
481        // The process state is the third field, e.g.:
482        // 1234 (bash) S 1233 ...
483        //             ^
484        let mut content_iter = content.split_whitespace();
485        assert_eq!(
486            content_iter.nth(2).unwrap(),
487            "S",
488            "Process should be in 'S' (sleeping) state after freezing"
489        );
490
491        // Thaw the unit
492        cgroup.thaw().unwrap();
493
494        // No more S now
495        let content = fs::read_to_string(&stat_path).unwrap();
496        let mut content_iter = content.split_whitespace();
497        assert_ne!(
498            content_iter.nth(2).unwrap(),
499            "S",
500            "Process should not be in 'S' (sleeping) state after thawing"
501        );
502
503        stop_cgroup(&cgroup);
504        child.wait().unwrap();
505    }
506
507    #[test]
508    fn test_systemd_version() {
509        skip_if_no_systemd!();
510
511        let unit = test_unit();
512        let props = PropertiesBuilder::default_cgroup(TEST_SLICE, &unit).build();
513        let cgroup = SystemdClient::new(&unit, props).unwrap();
514        let version = cgroup.systemd_version().unwrap();
515
516        let expected_version = systemd_version().unwrap();
517        assert_eq!(version, expected_version, "Systemd version mismatch");
518    }
519
520    #[test]
521    fn test_exists() {
522        skip_if_no_systemd!();
523
524        let unit = test_unit();
525        let mut child = spawn_sleep_inf();
526        let cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
527
528        assert!(cgroup.exists(), "Cgroup should exist after starting");
529
530        stop_cgroup(&cgroup);
531        child.wait().unwrap();
532    }
533
534    #[test]
535    fn test_add_process() {
536        skip_if_no_systemd!();
537
538        let unit = test_unit();
539        let mut child = spawn_sleep_inf();
540        let cgroup = start_default_cgroup(CgroupPid::from(child.id() as u64), &unit);
541
542        let mut child1 = spawn_sleep_inf();
543        let pid1 = CgroupPid::from(child1.id() as u64);
544        cgroup.add_process(pid1, "/").unwrap();
545
546        let cgroup_procs_path = format!(
547            "/sys/fs/cgroup/{}/{}/cgroup.procs",
548            expand_slice(TEST_SLICE).unwrap(),
549            unit
550        );
551        for i in 0..5 {
552            let content = fs::read_to_string(&cgroup_procs_path);
553            if let Ok(content) = content {
554                assert!(
555                    content.contains(&child1.id().to_string()),
556                    "Cgroup procs does not contain the child1 process ID"
557                );
558                break;
559            }
560            // Retry attempts exhausted, resulting in failure
561            if i == 4 {
562                content.unwrap();
563            }
564            // Wait 500ms before next retrying
565            sleep(Duration::from_millis(500));
566        }
567
568        stop_cgroup(&cgroup);
569        child.wait().unwrap();
570        child1.wait().unwrap();
571    }
572}