1use std::collections::HashMap;
2use std::convert::Infallible;
3use std::fmt::{Debug, Display};
4use std::fs::{self};
5use std::path::Component::RootDir;
6use std::path::{Path, PathBuf};
7use std::time::{Duration, Instant};
8
9use nix::NixPath;
10use nix::unistd::Pid;
11
12use super::controller::Controller;
13use super::controller_type::{CONTROLLER_TYPES, ControllerType};
14use super::cpu::Cpu;
15use super::cpuset::CpuSet;
16use super::dbus_native::client::SystemdClient;
17use super::dbus_native::dbus::DbusConnection;
18use super::dbus_native::utils::{DbusError, SystemdClientError};
19use super::memory::Memory;
20use super::pids::Pids;
21use crate::common::{
22 self, AnyCgroupManager, CgroupManager, ControllerOpt, FreezerState, JoinSafelyError,
23 PathBufExt, WrapIoResult, WrappedIoError,
24};
25use crate::stats::Stats;
26use crate::systemd::dbus_native::serialize::Variant;
27use crate::systemd::io::Io;
28use crate::systemd::unified::Unified;
29use crate::v2::manager::{Manager as FsManager, V2ManagerError};
30
31const CGROUP_CONTROLLERS: &str = "cgroup.controllers";
32const CGROUP_SUBTREE_CONTROL: &str = "cgroup.subtree_control";
33pub const PROCESS_IN_CGROUP_TIMEOUT_DURATION: Duration = Duration::from_secs(5);
34
35pub struct Manager {
36 root_path: PathBuf,
38 cgroups_path: PathBuf,
41 full_path: PathBuf,
43 destructured_path: CgroupsPath,
45 container_name: String,
47 unit_name: String,
49 client: DbusConnection,
51 fs_manager: FsManager,
53 delegation_boundary: PathBuf,
55 cgroup_wait_timeout_duration: Duration,
57}
58
59#[derive(Debug)]
64struct CgroupsPath {
65 parent: String,
66 prefix: String,
67 name: String,
68}
69
70#[derive(thiserror::Error, Debug)]
71pub enum CgroupsPathError {
72 #[error("no cgroups path has been provided")]
73 NoPath,
74 #[error("cgroups path does not contain valid utf8")]
75 InvalidUtf8(PathBuf),
76 #[error("cgroups path is malformed: {0}")]
77 MalformedPath(PathBuf),
78}
79
80impl TryFrom<&Path> for CgroupsPath {
81 type Error = CgroupsPathError;
82
83 fn try_from(cgroups_path: &Path) -> Result<Self, Self::Error> {
84 if cgroups_path.len() == 0 {
87 return Err(CgroupsPathError::NoPath);
88 }
89
90 let parts = cgroups_path
91 .to_str()
92 .ok_or_else(|| CgroupsPathError::InvalidUtf8(cgroups_path.to_path_buf()))?
93 .split(':')
94 .collect::<Vec<&str>>();
95
96 let destructured_path = match parts.len() {
97 2 => CgroupsPath {
98 parent: "".to_owned(),
99 prefix: parts[0].to_owned(),
100 name: parts[1].to_owned(),
101 },
102 3 => CgroupsPath {
103 parent: parts[0].to_owned(),
104 prefix: parts[1].to_owned(),
105 name: parts[2].to_owned(),
106 },
107 _ => return Err(CgroupsPathError::MalformedPath(cgroups_path.to_path_buf())),
108 };
109
110 Ok(destructured_path)
111 }
112}
113
114impl Display for CgroupsPath {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 write!(f, "{}:{}:{}", self.parent, self.prefix, self.name)
117 }
118}
119
120fn ensure_parent_unit(cgroups_path: &mut CgroupsPath, use_system: bool) {
122 if cgroups_path.parent.is_empty() {
123 cgroups_path.parent = match use_system {
124 true => "system.slice".to_owned(),
125 false => "user.slice".to_owned(),
126 }
127 }
128}
129
130impl Debug for Manager {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 f.debug_struct("Manager")
135 .field("root_path", &self.root_path)
136 .field("cgroups_path", &self.cgroups_path)
137 .field("full_path", &self.full_path)
138 .field("destructured_path", &self.destructured_path)
139 .field("container_name", &self.container_name)
140 .field("unit_name", &self.unit_name)
141 .finish()
142 }
143}
144
145#[derive(thiserror::Error, Debug)]
146pub enum SystemdManagerError {
147 #[error("io error: {0}")]
148 WrappedIo(#[from] WrappedIoError),
149 #[error("failed to destructure cgroups path: {0}")]
150 CgroupsPath(#[from] CgroupsPathError),
151 #[error("invalid slice name: {0}")]
152 InvalidSliceName(String),
153 #[error(transparent)]
154 SystemdClient(#[from] SystemdClientError),
155 #[error("failed to join safely: {0}")]
156 JoinSafely(#[from] JoinSafelyError),
157 #[error("file not found: {0}")]
158 FileNotFound(PathBuf),
159 #[error("bad delegation boundary {boundary} for cgroups path {cgroup}")]
160 BadDelegationBoundary { boundary: PathBuf, cgroup: PathBuf },
161 #[error("in v2 manager: {0}")]
162 V2Manager(#[from] V2ManagerError),
163
164 #[error("Timeout waiting for pid {0} to be added to cgroup")]
165 WaitForProcessInCgroupTimeout(String),
166
167 #[error("in cpu controller: {0}")]
168 Cpu(#[from] super::cpu::SystemdCpuError),
169 #[error("in cpuset controller: {0}")]
170 CpuSet(#[from] super::cpuset::SystemdCpuSetError),
171 #[error("in io controller: {0}")]
172 Io(#[from] super::io::SystemdIoError),
173 #[error("in memory controller: {0}")]
174 Memory(#[from] super::memory::SystemdMemoryError),
175 #[error("in pids controller: {0}")]
176 Pids(Infallible),
177 #[error("in pids unified controller: {0}")]
178 Unified(#[from] super::unified::SystemdUnifiedError),
179}
180
181impl SystemdManagerError {
182 pub fn is_ebusy(&self) -> bool {
183 matches!(
184 self,
185 SystemdManagerError::SystemdClient(SystemdClientError::DBus(
186 DbusError::DeviceOrResourceBusy(_)
187 ))
188 )
189 }
190}
191
192impl Manager {
193 pub fn new(
194 root_path: PathBuf,
195 cgroups_path: PathBuf,
196 container_name: String,
197 use_system: bool,
198 cgroup_wait_timeout_duration: Duration,
199 ) -> Result<Self, SystemdManagerError> {
200 let mut destructured_path: CgroupsPath = cgroups_path.as_path().try_into()?;
201 ensure_parent_unit(&mut destructured_path, use_system);
202
203 let client = match use_system {
204 true => DbusConnection::new_system()?,
205 false => DbusConnection::new_session()?,
206 };
207
208 let (cgroups_path, delegation_boundary) =
209 Self::construct_cgroups_path(&destructured_path, &client)?;
210 let full_path = root_path.join_safely(&cgroups_path)?;
211 let fs_manager = FsManager::new(root_path.clone(), cgroups_path.clone())?;
212
213 Ok(Manager {
214 root_path,
215 cgroups_path,
216 full_path,
217 container_name,
218 unit_name: Self::get_unit_name(&destructured_path),
219 destructured_path,
220 client,
221 fs_manager,
222 delegation_boundary,
223 cgroup_wait_timeout_duration,
224 })
225 }
226
227 fn get_unit_name(cgroups_path: &CgroupsPath) -> String {
230 if !cgroups_path.name.ends_with(".slice") {
232 return format!("{}-{}.scope", cgroups_path.prefix, cgroups_path.name);
233 }
234 cgroups_path.name.clone()
235 }
236
237 fn construct_cgroups_path(
241 cgroups_path: &CgroupsPath,
242 client: &dyn SystemdClient,
243 ) -> Result<(PathBuf, PathBuf), SystemdManagerError> {
244 let parent = Self::expand_slice(&cgroups_path.parent)?;
248 let systemd_root = client.control_cgroup_root()?;
249 let unit_name = Self::get_unit_name(cgroups_path);
250
251 let cgroups_path = systemd_root.join_safely(parent)?.join_safely(unit_name)?;
252 Ok((cgroups_path, systemd_root))
253 }
254
255 fn expand_slice(slice: &str) -> Result<PathBuf, SystemdManagerError> {
259 let suffix = ".slice";
260 if slice.len() <= suffix.len() || !slice.ends_with(suffix) {
261 return Err(SystemdManagerError::InvalidSliceName(slice.into()));
262 }
263 if slice.contains('/') {
264 return Err(SystemdManagerError::InvalidSliceName(slice.into()));
265 }
266 let mut path = "".to_owned();
267 let mut prefix = "".to_owned();
268 let slice_name = slice.trim_end_matches(suffix);
269 if slice_name == "-" {
271 return Ok(Path::new("/").to_path_buf());
272 }
273 for component in slice_name.split('-') {
274 if component.is_empty() {
275 return Err(SystemdManagerError::InvalidSliceName(slice.into()));
276 }
277 path = format!("{path}/{prefix}{component}{suffix}");
279 prefix = format!("{prefix}{component}-");
280 }
281 Ok(Path::new(&path).to_path_buf())
282 }
283
284 fn ensure_controllers_attached(&self) -> Result<(), SystemdManagerError> {
287 let full_boundary_path = self.root_path.join_safely(&self.delegation_boundary)?;
288
289 let controllers: Vec<String> = self
290 .get_available_controllers(&full_boundary_path)?
291 .into_iter()
292 .map(|c| format!("{}{}", "+", c))
293 .collect();
294
295 Self::write_controllers(&full_boundary_path, &controllers)?;
296
297 let mut current_path = full_boundary_path;
298 let mut components = self
299 .cgroups_path
300 .strip_prefix(&self.delegation_boundary)
301 .map_err(|_| SystemdManagerError::BadDelegationBoundary {
302 boundary: self.delegation_boundary.clone(),
303 cgroup: self.cgroups_path.clone(),
304 })?
305 .components()
306 .filter(|c| c.ne(&RootDir))
307 .peekable();
308 while let Some(component) = components.next() {
312 current_path = current_path.join(component);
313 if !current_path.exists() {
314 tracing::warn!(
315 "{:?} does not exist. Resource restrictions might not work correctly",
316 current_path
317 );
318 return Ok(());
319 }
320
321 if components.peek().is_some() {
324 Self::write_controllers(¤t_path, &controllers)?;
325 }
326 }
327
328 Ok(())
329 }
330
331 fn wait_for_process_in_cgroup(&self, pid: Pid) -> Result<(), SystemdManagerError> {
332 let start = Instant::now();
333 while start.elapsed() < self.cgroup_wait_timeout_duration {
334 let result = self.fs_manager.get_all_pids();
336 if let Ok(pids) = result {
337 if pids.contains(&pid) {
338 tracing::info!("Process {} successfully added to cgroup", pid);
339 return Ok(());
340 }
341 } else if let Err(e) = result {
342 if let V2ManagerError::WrappedIo(ref wrapped_io_error) = e {
343 if !matches!(wrapped_io_error, WrappedIoError::Read { .. }) {
344 return Err(e.into());
345 }
346 } else {
347 return Err(e.into());
348 }
349 }
350
351 std::thread::sleep(Duration::from_millis(20));
352 }
353 Err(SystemdManagerError::WaitForProcessInCgroupTimeout(
354 pid.to_string(),
355 ))
356 }
357
358 fn get_available_controllers<P: AsRef<Path>>(
359 &self,
360 cgroups_path: P,
361 ) -> Result<Vec<ControllerType>, SystemdManagerError> {
362 let controllers_path = self.root_path.join(cgroups_path).join(CGROUP_CONTROLLERS);
363 if !controllers_path.exists() {
364 return Err(SystemdManagerError::FileNotFound(controllers_path));
365 }
366
367 let mut controllers = Vec::new();
368 for controller in fs::read_to_string(&controllers_path)
369 .wrap_read(controllers_path)?
370 .split_whitespace()
371 {
372 match controller {
373 "cpu" => controllers.push(ControllerType::Cpu),
374 "memory" => controllers.push(ControllerType::Memory),
375 "pids" => controllers.push(ControllerType::Pids),
376 _ => continue,
377 }
378 }
379
380 Ok(controllers)
381 }
382
383 fn write_controllers(path: &Path, controllers: &[String]) -> Result<(), SystemdManagerError> {
384 for controller in controllers {
385 common::write_cgroup_file_str(path.join(CGROUP_SUBTREE_CONTROL), controller)?;
386 }
387
388 Ok(())
389 }
390
391 pub fn any(self) -> AnyCgroupManager {
392 AnyCgroupManager::Systemd(Box::new(self))
393 }
394}
395
396impl CgroupManager for Manager {
397 type Error = SystemdManagerError;
398
399 fn add_task(&self, pid: Pid) -> Result<(), Self::Error> {
400 if pid.as_raw() == -1 {
402 return Ok(());
403 }
404 if self.client.transient_unit_exists(&self.unit_name) {
405 tracing::debug!("Transient unit {:?} already exists", self.unit_name);
406 self.client
407 .add_process_to_unit(&self.unit_name, "", pid.as_raw() as u32)?;
408 return Ok(());
409 }
410
411 tracing::debug!("Starting {:?}", self.unit_name);
412 self.client.start_transient_unit(
413 &self.container_name,
414 pid.as_raw() as u32,
415 &self.destructured_path.parent,
416 &self.unit_name,
417 )?;
418
419 self.wait_for_process_in_cgroup(pid)?;
421
422 Ok(())
423 }
424
425 fn apply(&self, controller_opt: &ControllerOpt) -> Result<(), Self::Error> {
426 let mut properties: HashMap<&str, Variant> = HashMap::new();
427 let systemd_version = self.client.systemd_version()?;
428
429 for controller in CONTROLLER_TYPES {
430 match controller {
431 ControllerType::Cpu => {
432 Cpu::apply(controller_opt, systemd_version, &mut properties)?;
433 }
434
435 ControllerType::CpuSet => {
436 CpuSet::apply(controller_opt, systemd_version, &mut properties)?;
437 }
438
439 ControllerType::Pids => {
440 Pids::apply(controller_opt, systemd_version, &mut properties)
441 .map_err(SystemdManagerError::Pids)?;
442 }
443 ControllerType::Memory => {
444 Memory::apply(controller_opt, systemd_version, &mut properties)?;
445 }
446 ControllerType::Io => {
447 Io::apply(controller_opt, systemd_version, &mut properties)?;
448 }
449 };
450 }
451
452 tracing::debug!("applying properties {:?}", properties);
453 Unified::apply(controller_opt, systemd_version, &mut properties)?;
454
455 if !properties.is_empty() {
456 self.ensure_controllers_attached()?;
457 self.client
458 .set_unit_properties(&self.unit_name, &properties)?;
459 }
460
461 Ok(())
462 }
463
464 fn remove(&self) -> Result<(), Self::Error> {
465 tracing::debug!("remove {}", self.unit_name);
466 if self.client.transient_unit_exists(&self.unit_name) {
467 self.client.stop_transient_unit(&self.unit_name)?;
468 }
469
470 Ok(())
471 }
472
473 fn freeze(&self, state: FreezerState) -> Result<(), Self::Error> {
474 Ok(self.fs_manager.freeze(state)?)
475 }
476
477 fn stats(&self) -> Result<Stats, Self::Error> {
478 Ok(self.fs_manager.stats()?)
479 }
480
481 fn get_all_pids(&self) -> Result<Vec<Pid>, Self::Error> {
482 Ok(common::get_all_pids(&self.full_path)?)
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use anyhow::{Context, Result};
489
490 use super::*;
491 use crate::common::DEFAULT_CGROUP_ROOT;
492 use crate::systemd::dbus_native::client::SystemdClient;
493 use crate::systemd::dbus_native::serialize::Variant;
494 use crate::systemd::dbus_native::utils::SystemdClientError;
495
496 struct TestSystemdClient {}
497
498 impl SystemdClient for TestSystemdClient {
499 fn is_system(&self) -> bool {
500 true
501 }
502
503 fn transient_unit_exists(&self, _: &str) -> bool {
504 true
505 }
506
507 fn start_transient_unit(
508 &self,
509 _container_name: &str,
510 _pid: u32,
511 _parent: &str,
512 _unit_name: &str,
513 ) -> Result<(), SystemdClientError> {
514 Ok(())
515 }
516
517 fn stop_transient_unit(&self, _unit_name: &str) -> Result<(), SystemdClientError> {
518 Ok(())
519 }
520
521 fn set_unit_properties(
522 &self,
523 _unit_name: &str,
524 _properties: &HashMap<&str, Variant>,
525 ) -> Result<(), SystemdClientError> {
526 Ok(())
527 }
528
529 fn systemd_version(&self) -> Result<u32, SystemdClientError> {
530 Ok(245)
531 }
532
533 fn control_cgroup_root(&self) -> Result<PathBuf, SystemdClientError> {
534 Ok(PathBuf::from("/"))
535 }
536
537 fn add_process_to_unit(
538 &self,
539 _unit_name: &str,
540 _subcgroup: &str,
541 _pid: u32,
542 ) -> Result<(), SystemdClientError> {
543 Ok(())
544 }
545 }
546
547 #[test]
548 fn expand_slice_works() -> Result<()> {
549 assert_eq!(
550 Manager::expand_slice("test-a-b.slice")?,
551 PathBuf::from("/test.slice/test-a.slice/test-a-b.slice"),
552 );
553
554 Ok(())
555 }
556
557 #[test]
558 fn get_cgroups_path_works_with_a_complex_slice() -> Result<()> {
559 let cgroups_path = Path::new("test-a-b.slice:docker:foo")
560 .try_into()
561 .context("construct path")?;
562
563 assert_eq!(
564 Manager::construct_cgroups_path(&cgroups_path, &TestSystemdClient {})?.0,
565 PathBuf::from("/test.slice/test-a.slice/test-a-b.slice/docker-foo.scope"),
566 );
567
568 Ok(())
569 }
570
571 #[test]
572 fn get_cgroups_path_works_with_a_simple_slice() -> Result<()> {
573 let cgroups_path = Path::new("machine.slice:libpod:foo")
574 .try_into()
575 .context("construct path")?;
576
577 assert_eq!(
578 Manager::construct_cgroups_path(&cgroups_path, &TestSystemdClient {})?.0,
579 PathBuf::from("/machine.slice/libpod-foo.scope"),
580 );
581
582 Ok(())
583 }
584
585 #[test]
586 fn get_cgroups_path_works_without_parent() -> Result<()> {
587 let mut cgroups_path = Path::new(":docker:foo")
588 .try_into()
589 .context("construct path")?;
590 ensure_parent_unit(&mut cgroups_path, true);
591
592 assert_eq!(
593 Manager::construct_cgroups_path(&cgroups_path, &TestSystemdClient {})?.0,
594 PathBuf::from("/system.slice/docker-foo.scope"),
595 );
596
597 Ok(())
598 }
599
600 #[test]
601 fn test_task_addition() {
602 let manager = Manager::new(
603 DEFAULT_CGROUP_ROOT.into(),
604 ":youki:test".into(),
605 "youki_test_container".into(),
606 false,
607 PROCESS_IN_CGROUP_TIMEOUT_DURATION,
608 )
609 .unwrap();
610 let mut p1 = std::process::Command::new("sleep")
611 .arg("1s")
612 .spawn()
613 .unwrap();
614 let p1_id = nix::unistd::Pid::from_raw(p1.id() as i32);
615 let mut p2 = std::process::Command::new("sleep")
616 .arg("1s")
617 .spawn()
618 .unwrap();
619 let p2_id = nix::unistd::Pid::from_raw(p2.id() as i32);
620 manager.add_task(p1_id).unwrap();
621 manager.add_task(p2_id).unwrap();
622 let all_pids = manager.get_all_pids().unwrap();
623 assert!(all_pids.contains(&p1_id));
624 assert!(all_pids.contains(&p2_id));
625 let _ = p1.wait();
627 let _ = p2.wait();
628 manager.remove().unwrap();
629 let _ = fs::remove_dir(&manager.full_path);
632 }
633
634 #[test]
635 fn test_error_thrown_if_process_never_added_to_cgroup() -> Result<()> {
636 let manager = Manager::new(
637 DEFAULT_CGROUP_ROOT.into(),
638 ":youki:test".into(),
639 "youki_test_container".into(),
640 false,
641 Duration::from_secs(1),
642 )
643 .unwrap();
644
645 let p1_id = nix::unistd::Pid::from_raw(-1_i32);
647
648 let result = manager.wait_for_process_in_cgroup(p1_id);
649
650 assert!(matches!(
651 result,
652 Err(SystemdManagerError::WaitForProcessInCgroupTimeout(..))
653 ));
654 Ok(())
655 }
656}