Skip to main content

libcgroups/systemd/
manager.rs

1use std::collections::HashMap;
2use std::convert::Infallible;
3use std::fmt::{Debug, Display};
4use std::fs::{self};
5use std::path::Component::RootDir;
6use std::path::{Path, PathBuf};
7use std::time::{Duration, Instant};
8
9use nix::NixPath;
10use nix::unistd::Pid;
11
12use super::controller::Controller;
13use super::controller_type::{CONTROLLER_TYPES, ControllerType};
14use super::cpu::Cpu;
15use super::cpuset::CpuSet;
16use super::dbus_native::client::SystemdClient;
17use super::dbus_native::dbus::DbusConnection;
18use super::dbus_native::utils::{DbusError, SystemdClientError};
19use super::memory::Memory;
20use super::pids::Pids;
21use crate::common::{
22    self, AnyCgroupManager, CgroupManager, ControllerOpt, FreezerState, JoinSafelyError,
23    PathBufExt, WrapIoResult, WrappedIoError,
24};
25use crate::stats::Stats;
26use crate::systemd::dbus_native::serialize::Variant;
27use crate::systemd::io::Io;
28use crate::systemd::unified::Unified;
29use crate::v2::manager::{Manager as FsManager, V2ManagerError};
30
31const CGROUP_CONTROLLERS: &str = "cgroup.controllers";
32const CGROUP_SUBTREE_CONTROL: &str = "cgroup.subtree_control";
33pub const PROCESS_IN_CGROUP_TIMEOUT_DURATION: Duration = Duration::from_secs(5);
34
35pub struct Manager {
36    /// Root path of the cgroup hierarchy e.g. /sys/fs/cgroup
37    root_path: PathBuf,
38    /// Path relative to the root path e.g. /system.slice/youki-569d5ce3afe1074769f67.scope for rootfull containers
39    /// and e.g. /user.slice/user-1000/user@1000.service/youki-569d5ce3afe1074769f67.scope for rootless containers
40    cgroups_path: PathBuf,
41    /// Combination of root path and cgroups path
42    full_path: PathBuf,
43    /// Destructured cgroups path as specified in the runtime spec e.g. system.slice:youki:569d5ce3afe1074769f67
44    destructured_path: CgroupsPath,
45    /// Name of the container e.g. 569d5ce3afe1074769f67
46    container_name: String,
47    /// Name of the systemd unit e.g. youki-569d5ce3afe1074769f67.scope
48    unit_name: String,
49    /// Client for communicating with systemd
50    client: DbusConnection,
51    /// Cgroup manager for the created transient unit
52    fs_manager: FsManager,
53    /// Last control group which is managed by systemd, e.g. /user.slice/user-1000/user@1000.service
54    delegation_boundary: PathBuf,
55    /// Duration to wait for a specific PID to be added to a cgroup
56    cgroup_wait_timeout_duration: Duration,
57}
58
59/// Represents the systemd cgroups path:
60/// It should be of the form [slice]:[scope_prefix]:[name].
61/// The slice is the "parent" and should be expanded properly,
62/// see expand_slice below.
63#[derive(Debug)]
64struct CgroupsPath {
65    parent: String,
66    prefix: String,
67    name: String,
68}
69
70#[derive(thiserror::Error, Debug)]
71pub enum CgroupsPathError {
72    #[error("no cgroups path has been provided")]
73    NoPath,
74    #[error("cgroups path does not contain valid utf8")]
75    InvalidUtf8(PathBuf),
76    #[error("cgroups path is malformed: {0}")]
77    MalformedPath(PathBuf),
78}
79
80impl TryFrom<&Path> for CgroupsPath {
81    type Error = CgroupsPathError;
82
83    fn try_from(cgroups_path: &Path) -> Result<Self, Self::Error> {
84        // if cgroups_path was provided it should be of the form [slice]:[prefix]:[name],
85        // for example: "system.slice:docker:1234".
86        if cgroups_path.len() == 0 {
87            return Err(CgroupsPathError::NoPath);
88        }
89
90        let parts = cgroups_path
91            .to_str()
92            .ok_or_else(|| CgroupsPathError::InvalidUtf8(cgroups_path.to_path_buf()))?
93            .split(':')
94            .collect::<Vec<&str>>();
95
96        let destructured_path = match parts.len() {
97            2 => CgroupsPath {
98                parent: "".to_owned(),
99                prefix: parts[0].to_owned(),
100                name: parts[1].to_owned(),
101            },
102            3 => CgroupsPath {
103                parent: parts[0].to_owned(),
104                prefix: parts[1].to_owned(),
105                name: parts[2].to_owned(),
106            },
107            _ => return Err(CgroupsPathError::MalformedPath(cgroups_path.to_path_buf())),
108        };
109
110        Ok(destructured_path)
111    }
112}
113
114impl Display for CgroupsPath {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        write!(f, "{}:{}:{}", self.parent, self.prefix, self.name)
117    }
118}
119
120/// ensures that a parent unit for the current unit is specified
121fn ensure_parent_unit(cgroups_path: &mut CgroupsPath, use_system: bool) {
122    if cgroups_path.parent.is_empty() {
123        cgroups_path.parent = match use_system {
124            true => "system.slice".to_owned(),
125            false => "user.slice".to_owned(),
126        }
127    }
128}
129
130// custom debug impl as Manager contains fields that do not implement Debug
131// and therefore Debug cannot be derived
132impl Debug for Manager {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        f.debug_struct("Manager")
135            .field("root_path", &self.root_path)
136            .field("cgroups_path", &self.cgroups_path)
137            .field("full_path", &self.full_path)
138            .field("destructured_path", &self.destructured_path)
139            .field("container_name", &self.container_name)
140            .field("unit_name", &self.unit_name)
141            .finish()
142    }
143}
144
145#[derive(thiserror::Error, Debug)]
146pub enum SystemdManagerError {
147    #[error("io error: {0}")]
148    WrappedIo(#[from] WrappedIoError),
149    #[error("failed to destructure cgroups path: {0}")]
150    CgroupsPath(#[from] CgroupsPathError),
151    #[error("invalid slice name: {0}")]
152    InvalidSliceName(String),
153    #[error(transparent)]
154    SystemdClient(#[from] SystemdClientError),
155    #[error("failed to join safely: {0}")]
156    JoinSafely(#[from] JoinSafelyError),
157    #[error("file not found: {0}")]
158    FileNotFound(PathBuf),
159    #[error("bad delegation boundary {boundary} for cgroups path {cgroup}")]
160    BadDelegationBoundary { boundary: PathBuf, cgroup: PathBuf },
161    #[error("in v2 manager: {0}")]
162    V2Manager(#[from] V2ManagerError),
163
164    #[error("Timeout waiting for pid {0} to be added to cgroup")]
165    WaitForProcessInCgroupTimeout(String),
166
167    #[error("in cpu controller: {0}")]
168    Cpu(#[from] super::cpu::SystemdCpuError),
169    #[error("in cpuset controller: {0}")]
170    CpuSet(#[from] super::cpuset::SystemdCpuSetError),
171    #[error("in io controller: {0}")]
172    Io(#[from] super::io::SystemdIoError),
173    #[error("in memory controller: {0}")]
174    Memory(#[from] super::memory::SystemdMemoryError),
175    #[error("in pids controller: {0}")]
176    Pids(Infallible),
177    #[error("in pids unified controller: {0}")]
178    Unified(#[from] super::unified::SystemdUnifiedError),
179}
180
181impl SystemdManagerError {
182    pub fn is_ebusy(&self) -> bool {
183        matches!(
184            self,
185            SystemdManagerError::SystemdClient(SystemdClientError::DBus(
186                DbusError::DeviceOrResourceBusy(_)
187            ))
188        )
189    }
190}
191
192impl Manager {
193    pub fn new(
194        root_path: PathBuf,
195        cgroups_path: PathBuf,
196        container_name: String,
197        use_system: bool,
198        cgroup_wait_timeout_duration: Duration,
199    ) -> Result<Self, SystemdManagerError> {
200        let mut destructured_path: CgroupsPath = cgroups_path.as_path().try_into()?;
201        ensure_parent_unit(&mut destructured_path, use_system);
202
203        let client = match use_system {
204            true => DbusConnection::new_system()?,
205            false => DbusConnection::new_session()?,
206        };
207
208        let (cgroups_path, delegation_boundary) =
209            Self::construct_cgroups_path(&destructured_path, &client)?;
210        let full_path = root_path.join_safely(&cgroups_path)?;
211        let fs_manager = FsManager::new(root_path.clone(), cgroups_path.clone())?;
212
213        Ok(Manager {
214            root_path,
215            cgroups_path,
216            full_path,
217            container_name,
218            unit_name: Self::get_unit_name(&destructured_path),
219            destructured_path,
220            client,
221            fs_manager,
222            delegation_boundary,
223            cgroup_wait_timeout_duration,
224        })
225    }
226
227    /// get_unit_name returns the unit (scope) name from the path provided by the user
228    /// for example: foo:docker:bar returns in '/docker-bar.scope'
229    fn get_unit_name(cgroups_path: &CgroupsPath) -> String {
230        // By default we create a scope unless specified explicitly.
231        if !cgroups_path.name.ends_with(".slice") {
232            return format!("{}-{}.scope", cgroups_path.prefix, cgroups_path.name);
233        }
234        cgroups_path.name.clone()
235    }
236
237    // get_cgroups_path generates a cgroups path from the one provided by the user via cgroupsPath.
238    // an example of the final path: "/system.slice/youki-569d5ce3afe1074769f67.scope" or if we are
239    // not running as root /user.slice/user-1000/user@1000.service/youki-569d5ce3afe1074769f67.scope
240    fn construct_cgroups_path(
241        cgroups_path: &CgroupsPath,
242        client: &dyn SystemdClient,
243    ) -> Result<(PathBuf, PathBuf), SystemdManagerError> {
244        // if the user provided a '.slice' (as in a branch of a tree)
245        // we need to convert it to a filesystem path.
246
247        let parent = Self::expand_slice(&cgroups_path.parent)?;
248        let systemd_root = client.control_cgroup_root()?;
249        let unit_name = Self::get_unit_name(cgroups_path);
250
251        let cgroups_path = systemd_root.join_safely(parent)?.join_safely(unit_name)?;
252        Ok((cgroups_path, systemd_root))
253    }
254
255    // systemd represents slice hierarchy using `-`, so we need to follow suit when
256    // generating the path of slice. For example, 'test-a-b.slice' becomes
257    // '/test.slice/test-a.slice/test-a-b.slice'.
258    fn expand_slice(slice: &str) -> Result<PathBuf, SystemdManagerError> {
259        let suffix = ".slice";
260        if slice.len() <= suffix.len() || !slice.ends_with(suffix) {
261            return Err(SystemdManagerError::InvalidSliceName(slice.into()));
262        }
263        if slice.contains('/') {
264            return Err(SystemdManagerError::InvalidSliceName(slice.into()));
265        }
266        let mut path = "".to_owned();
267        let mut prefix = "".to_owned();
268        let slice_name = slice.trim_end_matches(suffix);
269        // if input was -.slice, we should just return root now
270        if slice_name == "-" {
271            return Ok(Path::new("/").to_path_buf());
272        }
273        for component in slice_name.split('-') {
274            if component.is_empty() {
275                return Err(SystemdManagerError::InvalidSliceName(slice.into()));
276            }
277            // Append the component to the path and to the prefix.
278            path = format!("{path}/{prefix}{component}{suffix}");
279            prefix = format!("{prefix}{component}-");
280        }
281        Ok(Path::new(&path).to_path_buf())
282    }
283
284    /// ensures that each level in the downward path from the delegation boundary down to
285    /// the scope or slice of the transient unit has all available controllers enabled
286    fn ensure_controllers_attached(&self) -> Result<(), SystemdManagerError> {
287        let full_boundary_path = self.root_path.join_safely(&self.delegation_boundary)?;
288
289        let controllers: Vec<String> = self
290            .get_available_controllers(&full_boundary_path)?
291            .into_iter()
292            .map(|c| format!("{}{}", "+", c))
293            .collect();
294
295        Self::write_controllers(&full_boundary_path, &controllers)?;
296
297        let mut current_path = full_boundary_path;
298        let mut components = self
299            .cgroups_path
300            .strip_prefix(&self.delegation_boundary)
301            .map_err(|_| SystemdManagerError::BadDelegationBoundary {
302                boundary: self.delegation_boundary.clone(),
303                cgroup: self.cgroups_path.clone(),
304            })?
305            .components()
306            .filter(|c| c.ne(&RootDir))
307            .peekable();
308        // Verify that *each level* in the downward path from the root cgroup
309        // down to the cgroup_path provided by the user is a valid cgroup hierarchy.
310        // containing the attached controllers.
311        while let Some(component) = components.next() {
312            current_path = current_path.join(component);
313            if !current_path.exists() {
314                tracing::warn!(
315                    "{:?} does not exist. Resource restrictions might not work correctly",
316                    current_path
317                );
318                return Ok(());
319            }
320
321            // last component cannot have subtree_control enabled due to internal process constraint
322            // if this were set, writing to the cgroups.procs file will fail with Erno 16 (device or resource busy)
323            if components.peek().is_some() {
324                Self::write_controllers(&current_path, &controllers)?;
325            }
326        }
327
328        Ok(())
329    }
330
331    fn wait_for_process_in_cgroup(&self, pid: Pid) -> Result<(), SystemdManagerError> {
332        let start = Instant::now();
333        while start.elapsed() < self.cgroup_wait_timeout_duration {
334            // If it fails, it most likely means that the cgroup hasn't been set up yet.
335            let result = self.fs_manager.get_all_pids();
336            if let Ok(pids) = result {
337                if pids.contains(&pid) {
338                    tracing::info!("Process {} successfully added to cgroup", pid);
339                    return Ok(());
340                }
341            } else if let Err(e) = result {
342                if let V2ManagerError::WrappedIo(ref wrapped_io_error) = e {
343                    if !matches!(wrapped_io_error, WrappedIoError::Read { .. }) {
344                        return Err(e.into());
345                    }
346                } else {
347                    return Err(e.into());
348                }
349            }
350
351            std::thread::sleep(Duration::from_millis(20));
352        }
353        Err(SystemdManagerError::WaitForProcessInCgroupTimeout(
354            pid.to_string(),
355        ))
356    }
357
358    fn get_available_controllers<P: AsRef<Path>>(
359        &self,
360        cgroups_path: P,
361    ) -> Result<Vec<ControllerType>, SystemdManagerError> {
362        let controllers_path = self.root_path.join(cgroups_path).join(CGROUP_CONTROLLERS);
363        if !controllers_path.exists() {
364            return Err(SystemdManagerError::FileNotFound(controllers_path));
365        }
366
367        let mut controllers = Vec::new();
368        for controller in fs::read_to_string(&controllers_path)
369            .wrap_read(controllers_path)?
370            .split_whitespace()
371        {
372            match controller {
373                "cpu" => controllers.push(ControllerType::Cpu),
374                "memory" => controllers.push(ControllerType::Memory),
375                "pids" => controllers.push(ControllerType::Pids),
376                _ => continue,
377            }
378        }
379
380        Ok(controllers)
381    }
382
383    fn write_controllers(path: &Path, controllers: &[String]) -> Result<(), SystemdManagerError> {
384        for controller in controllers {
385            common::write_cgroup_file_str(path.join(CGROUP_SUBTREE_CONTROL), controller)?;
386        }
387
388        Ok(())
389    }
390
391    pub fn any(self) -> AnyCgroupManager {
392        AnyCgroupManager::Systemd(Box::new(self))
393    }
394}
395
396impl CgroupManager for Manager {
397    type Error = SystemdManagerError;
398
399    fn add_task(&self, pid: Pid) -> Result<(), Self::Error> {
400        // Dont attach any pid to the cgroup if -1 is specified as a pid
401        if pid.as_raw() == -1 {
402            return Ok(());
403        }
404        if self.client.transient_unit_exists(&self.unit_name) {
405            tracing::debug!("Transient unit {:?} already exists", self.unit_name);
406            self.client
407                .add_process_to_unit(&self.unit_name, "", pid.as_raw() as u32)?;
408            return Ok(());
409        }
410
411        tracing::debug!("Starting {:?}", self.unit_name);
412        self.client.start_transient_unit(
413            &self.container_name,
414            pid.as_raw() as u32,
415            &self.destructured_path.parent,
416            &self.unit_name,
417        )?;
418
419        // There is a chance that the intermediate process ends before systemd gets the dbus message to add it to transit unit.
420        self.wait_for_process_in_cgroup(pid)?;
421
422        Ok(())
423    }
424
425    fn apply(&self, controller_opt: &ControllerOpt) -> Result<(), Self::Error> {
426        let mut properties: HashMap<&str, Variant> = HashMap::new();
427        let systemd_version = self.client.systemd_version()?;
428
429        for controller in CONTROLLER_TYPES {
430            match controller {
431                ControllerType::Cpu => {
432                    Cpu::apply(controller_opt, systemd_version, &mut properties)?;
433                }
434
435                ControllerType::CpuSet => {
436                    CpuSet::apply(controller_opt, systemd_version, &mut properties)?;
437                }
438
439                ControllerType::Pids => {
440                    Pids::apply(controller_opt, systemd_version, &mut properties)
441                        .map_err(SystemdManagerError::Pids)?;
442                }
443                ControllerType::Memory => {
444                    Memory::apply(controller_opt, systemd_version, &mut properties)?;
445                }
446                ControllerType::Io => {
447                    Io::apply(controller_opt, systemd_version, &mut properties)?;
448                }
449            };
450        }
451
452        tracing::debug!("applying properties {:?}", properties);
453        Unified::apply(controller_opt, systemd_version, &mut properties)?;
454
455        if !properties.is_empty() {
456            self.ensure_controllers_attached()?;
457            self.client
458                .set_unit_properties(&self.unit_name, &properties)?;
459        }
460
461        Ok(())
462    }
463
464    fn remove(&self) -> Result<(), Self::Error> {
465        tracing::debug!("remove {}", self.unit_name);
466        if self.client.transient_unit_exists(&self.unit_name) {
467            self.client.stop_transient_unit(&self.unit_name)?;
468        }
469
470        Ok(())
471    }
472
473    fn freeze(&self, state: FreezerState) -> Result<(), Self::Error> {
474        Ok(self.fs_manager.freeze(state)?)
475    }
476
477    fn stats(&self) -> Result<Stats, Self::Error> {
478        Ok(self.fs_manager.stats()?)
479    }
480
481    fn get_all_pids(&self) -> Result<Vec<Pid>, Self::Error> {
482        Ok(common::get_all_pids(&self.full_path)?)
483    }
484}
485
486#[cfg(test)]
487mod tests {
488    use anyhow::{Context, Result};
489
490    use super::*;
491    use crate::common::DEFAULT_CGROUP_ROOT;
492    use crate::systemd::dbus_native::client::SystemdClient;
493    use crate::systemd::dbus_native::serialize::Variant;
494    use crate::systemd::dbus_native::utils::SystemdClientError;
495
496    struct TestSystemdClient {}
497
498    impl SystemdClient for TestSystemdClient {
499        fn is_system(&self) -> bool {
500            true
501        }
502
503        fn transient_unit_exists(&self, _: &str) -> bool {
504            true
505        }
506
507        fn start_transient_unit(
508            &self,
509            _container_name: &str,
510            _pid: u32,
511            _parent: &str,
512            _unit_name: &str,
513        ) -> Result<(), SystemdClientError> {
514            Ok(())
515        }
516
517        fn stop_transient_unit(&self, _unit_name: &str) -> Result<(), SystemdClientError> {
518            Ok(())
519        }
520
521        fn set_unit_properties(
522            &self,
523            _unit_name: &str,
524            _properties: &HashMap<&str, Variant>,
525        ) -> Result<(), SystemdClientError> {
526            Ok(())
527        }
528
529        fn systemd_version(&self) -> Result<u32, SystemdClientError> {
530            Ok(245)
531        }
532
533        fn control_cgroup_root(&self) -> Result<PathBuf, SystemdClientError> {
534            Ok(PathBuf::from("/"))
535        }
536
537        fn add_process_to_unit(
538            &self,
539            _unit_name: &str,
540            _subcgroup: &str,
541            _pid: u32,
542        ) -> Result<(), SystemdClientError> {
543            Ok(())
544        }
545    }
546
547    #[test]
548    fn expand_slice_works() -> Result<()> {
549        assert_eq!(
550            Manager::expand_slice("test-a-b.slice")?,
551            PathBuf::from("/test.slice/test-a.slice/test-a-b.slice"),
552        );
553
554        Ok(())
555    }
556
557    #[test]
558    fn get_cgroups_path_works_with_a_complex_slice() -> Result<()> {
559        let cgroups_path = Path::new("test-a-b.slice:docker:foo")
560            .try_into()
561            .context("construct path")?;
562
563        assert_eq!(
564            Manager::construct_cgroups_path(&cgroups_path, &TestSystemdClient {})?.0,
565            PathBuf::from("/test.slice/test-a.slice/test-a-b.slice/docker-foo.scope"),
566        );
567
568        Ok(())
569    }
570
571    #[test]
572    fn get_cgroups_path_works_with_a_simple_slice() -> Result<()> {
573        let cgroups_path = Path::new("machine.slice:libpod:foo")
574            .try_into()
575            .context("construct path")?;
576
577        assert_eq!(
578            Manager::construct_cgroups_path(&cgroups_path, &TestSystemdClient {})?.0,
579            PathBuf::from("/machine.slice/libpod-foo.scope"),
580        );
581
582        Ok(())
583    }
584
585    #[test]
586    fn get_cgroups_path_works_without_parent() -> Result<()> {
587        let mut cgroups_path = Path::new(":docker:foo")
588            .try_into()
589            .context("construct path")?;
590        ensure_parent_unit(&mut cgroups_path, true);
591
592        assert_eq!(
593            Manager::construct_cgroups_path(&cgroups_path, &TestSystemdClient {})?.0,
594            PathBuf::from("/system.slice/docker-foo.scope"),
595        );
596
597        Ok(())
598    }
599
600    #[test]
601    fn test_task_addition() {
602        let manager = Manager::new(
603            DEFAULT_CGROUP_ROOT.into(),
604            ":youki:test".into(),
605            "youki_test_container".into(),
606            false,
607            PROCESS_IN_CGROUP_TIMEOUT_DURATION,
608        )
609        .unwrap();
610        let mut p1 = std::process::Command::new("sleep")
611            .arg("1s")
612            .spawn()
613            .unwrap();
614        let p1_id = nix::unistd::Pid::from_raw(p1.id() as i32);
615        let mut p2 = std::process::Command::new("sleep")
616            .arg("1s")
617            .spawn()
618            .unwrap();
619        let p2_id = nix::unistd::Pid::from_raw(p2.id() as i32);
620        manager.add_task(p1_id).unwrap();
621        manager.add_task(p2_id).unwrap();
622        let all_pids = manager.get_all_pids().unwrap();
623        assert!(all_pids.contains(&p1_id));
624        assert!(all_pids.contains(&p2_id));
625        // wait till both processes are finished so we can cleanup the cgroup
626        let _ = p1.wait();
627        let _ = p2.wait();
628        manager.remove().unwrap();
629        // the remove call above should remove the dir, we just do this again
630        // for contingency, and thus ignore the result
631        let _ = fs::remove_dir(&manager.full_path);
632    }
633
634    #[test]
635    fn test_error_thrown_if_process_never_added_to_cgroup() -> Result<()> {
636        let manager = Manager::new(
637            DEFAULT_CGROUP_ROOT.into(),
638            ":youki:test".into(),
639            "youki_test_container".into(),
640            false,
641            Duration::from_secs(1),
642        )
643        .unwrap();
644
645        // Bogus Pid
646        let p1_id = nix::unistd::Pid::from_raw(-1_i32);
647
648        let result = manager.wait_for_process_in_cgroup(p1_id);
649
650        assert!(matches!(
651            result,
652            Err(SystemdManagerError::WaitForProcessInCgroupTimeout(..))
653        ));
654        Ok(())
655    }
656}