use crate::{
api::{self, model},
common::{container::Container, name::Name, non_nul_string::NonNulString, version::VersionReq},
npk::manifest::{
autostart::Autostart,
mount::{Mount, Resource},
Manifest,
},
runtime::{
cgroups,
config::{Config, RepositoryType},
console::{Console, Peer, Request},
env,
error::Error,
events::{CGroupEvent, ContainerEvent, Event, EventTx},
exit_status::ExitStatus,
fork::Forker,
io,
io::ContainerIo,
mount::MountControl,
persistence,
repository::{DirRepository, MemRepository, Npk, RepositoryId},
runtime::{NotificationTx, Pid},
sockets,
sockets::Sockets,
},
};
use anyhow::{Context, Result};
use bytes::Bytes;
use futures::{
future::{join_all, ready, Either},
Future, Stream, StreamExt, TryFutureExt,
};
use itertools::Itertools;
use log::{debug, error, info, warn};
use nix::sys::signal::Signal;
use std::{
collections::{HashMap, HashSet},
convert::TryFrom,
fmt::Debug,
iter::{once, FromIterator},
os::unix::{net::UnixStream as StdUnixStream, prelude::OwnedFd},
path::{Path, PathBuf},
sync::Arc,
};
use tokio::{
fs,
net::UnixStream,
pin,
sync::{mpsc, oneshot},
task::{self},
time,
};
use tokio_util::sync::CancellationToken;
type Repository = Box<dyn super::repository::Repository + Send + Sync>;
#[derive(Debug)]
pub(super) struct State {
config: Config,
events_tx: EventTx,
notification_tx: NotificationTx,
mount_control: Arc<MountControl>,
forker: Forker,
containers: HashMap<Container, ContainerState>,
repositories: HashMap<RepositoryId, Repository>,
selinux_enabled: bool,
}
#[derive(Debug, Default)]
pub(super) struct ContainerState {
pub repository: RepositoryId,
pub root: Option<PathBuf>,
pub process: Option<ContainerContext>,
}
impl ContainerState {
pub fn is_mounted(&self) -> bool {
self.root.is_some()
}
}
#[derive(Debug)]
pub(super) struct ContainerContext {
pid: Pid,
started: time::Instant,
cgroups: cgroups::CGroups,
sockets: Sockets,
stop: CancellationToken,
resources: HashSet<Container>,
}
impl ContainerContext {
async fn destroy(self) {
self.stop.cancel();
self.cgroups.destroy().await;
self.sockets.destroy().await;
}
}
impl State {
pub(super) async fn new(
config: Config,
events_tx: EventTx,
notification_tx: NotificationTx,
forker: Forker,
) -> Result<State> {
let repositories = HashMap::new();
let containers = HashMap::new();
let selinux_enabled = is_selinux_enabled();
let mount_control = Arc::new(
MountControl::new(config.loop_device_timeout)
.await
.expect("failed to initialize mount control"),
);
let mut state = State {
events_tx,
notification_tx,
repositories,
containers,
config,
forker,
mount_control,
selinux_enabled,
};
let mount_repositories = state.initialize_repositories().await?;
state.automount(&mount_repositories).await?;
state.autostart().await?;
Ok(state)
}
async fn initialize_repositories(&mut self) -> Result<HashSet<RepositoryId>> {
let mut mount_repositories = HashSet::with_capacity(self.config.repositories.len());
for (id, repository) in &self.config.repositories {
if repository.mount_on_start {
mount_repositories.insert(id.clone());
}
let repository = match &repository.r#type {
RepositoryType::Fs { dir } => {
let repository = DirRepository::new(dir, repository).await?;
Box::new(repository) as Repository
}
RepositoryType::Memory => {
let repository = MemRepository::new(repository).await?;
Box::new(repository) as Repository
}
};
for npk in repository.containers() {
let name = npk.manifest().name.clone();
let version = npk.manifest().version.clone();
let container = Container::new(name, version);
if let Ok(state) = self.state(&container) {
warn!("Skipping duplicate container {} which is already loaded from repository {}", container, state.repository);
} else {
self.containers.insert(
container,
ContainerState {
repository: id.clone(),
..Default::default()
},
);
}
}
self.repositories.insert(id.clone(), repository);
}
Ok(mount_repositories)
}
async fn automount(&mut self, repositories: &HashSet<RepositoryId>) -> Result<()> {
if repositories.is_empty() {
return Ok(());
}
info!(
"Trying to mount containers from repository {}",
repositories.iter().join(", ")
);
let containers = self
.containers
.iter()
.filter(|(_, state)| repositories.contains(&state.repository))
.map(|(container, _)| container.clone())
.collect::<Vec<Container>>();
if !containers.is_empty() {
for result in self.mount_all(&containers).await {
result?;
}
}
Ok(())
}
async fn autostart(&mut self) -> Result<(), Error> {
let start = time::Instant::now();
let mut autostarts = Vec::with_capacity(self.containers.len());
let mut to_mount = Vec::with_capacity(self.containers.len());
for (container, state) in self.containers.iter() {
if let Some(autostart) = self
.manifest(container)
.expect("internal error")
.autostart
.as_ref()
{
autostarts.push((container.clone(), autostart.clone()));
if !state.is_mounted() {
to_mount.push(container.clone())
}
}
}
for (container, autostart) in &autostarts {
let manifest = self.manifest(container)?;
for mount in manifest.mounts.values() {
if let Mount::Resource(Resource { name, version, .. }) = mount {
if let Some(resource) =
State::match_container(name, version, self.containers.keys())
{
to_mount.push(resource.clone());
} else {
let error = Error::StartContainerMissingResource(
container.clone(),
name.clone(),
version.to_string(),
);
Self::warn_autostart_failure(container, autostart, error)?
}
}
}
}
if !to_mount.is_empty() {
self.mount_all(&to_mount).await;
}
if !autostarts.is_empty() {
for (container, autostart) in &autostarts {
info!("Autostarting {} ({:?})", container, autostart);
if let Err(e) = self
.start(container, None, &[], &HashMap::with_capacity(0))
.await
{
Self::warn_autostart_failure(container, autostart, e)?
}
}
let duration = start.elapsed();
let containers = autostarts.len();
info!("Successfully started {containers} container(s) in {duration:?}",);
}
Ok(())
}
fn warn_autostart_failure(
container: &Container,
autostart: &Autostart,
e: Error,
) -> Result<(), Error> {
match autostart {
Autostart::Relaxed => {
warn!("Failed to autostart relaxed {}: {}", container, e);
Ok(())
}
Autostart::Critical => {
error!("Failed to autostart critical {}: {}", container, e);
Err(e)
}
}
}
fn mount(&self, container: &Container) -> impl Future<Output = Result<PathBuf>> {
let container_state = self.containers.get(container).expect("internal error");
let repository = self
.repositories
.get(&container_state.repository)
.expect("internal error");
let key = repository.key().cloned();
let npk = self.npk(container).expect("internal error");
let root = self.config.run_dir.join(container.to_string());
let mount_control = self.mount_control.clone();
mount_control
.mount(npk, &root, key.as_ref(), self.selinux_enabled)
.map_ok(|_| root)
}
fn umount(&self, container: &Container) -> impl Future<Output = Result<(), Error>> {
if let Some(user) = self
.containers
.iter()
.filter_map(|(c, state)| state.process.as_ref().map(|process| (c, process)))
.find(|(_, process)| process.resources.contains(container))
.map(|(c, _)| c)
{
warn!(
"Failed to umount {} because it is used by {}",
container, user
);
return Either::Right(ready(Err(Error::UmountBusy(container.clone()))));
}
match self.state(container).and_then(|state| {
state
.root
.as_ref()
.ok_or_else(|| Error::UmountBusy(container.clone()))
}) {
Ok(root) => Either::Left(MountControl::umount(root).map_err(Error::from)),
Err(e) => Either::Right(ready(Err(e))),
}
}
pub(super) async fn start(
&mut self,
container: &Container,
init: Option<NonNulString>,
args_extra: &[NonNulString],
env_extra: &HashMap<NonNulString, NonNulString>,
) -> Result<(), Error> {
let start = time::Instant::now();
info!("Trying to start {}", container);
let container_state = self.state(container)?;
if container_state.process.is_some() {
warn!("Application {} is already running", container);
return Err(Error::StartContainerStarted(container.clone()));
}
if let Some(container) = self
.containers
.iter()
.filter_map(|(k, v)| v.process.as_ref().map(|_| k))
.find(|c| c.name() == container.name())
{
warn!("Application {} is already running", container);
return Err(Error::StartContainerStarted(container.clone()));
}
if env_extra.keys().any(|k| {
k.as_str() == env::NAME
|| k.as_str() == env::VERSION
|| k.as_str() == env::CONTAINER
|| k.as_str() == env::CONSOLE
}) {
return Err(Error::InvalidArguments(format!(
"env contains reserved key {} or {} or {} or {}",
env::NAME,
env::VERSION,
env::CONTAINER,
env::CONSOLE
)));
}
let manifest = self.manifest(container)?.clone();
let init = if let Some(init) = init {
if let Some(ref i) = manifest.init {
unsafe { NonNulString::from_string_unchecked(init.replace("<INIT>", i)) }
} else {
init
}
} else {
manifest.init.clone().ok_or_else(|| {
warn!("Container {} is a resource", container);
Error::StartContainerResource(container.clone())
})?
};
let mut need_mount = HashSet::new();
let mut resources = HashSet::new();
if !container_state.is_mounted() {
need_mount.insert(container.clone());
}
let required_resources = manifest
.mounts
.values()
.filter_map(|m| match m {
Mount::Resource(resource) => Some(resource),
_ => None,
})
.collect::<Vec<_>>();
for resource in required_resources {
let best_match =
State::match_container(&resource.name, &resource.version, self.containers.keys())
.ok_or_else(|| {
Error::StartContainerMissingResource(
container.clone(),
resource.name.clone(),
resource.version.to_string(),
)
})?;
let state = self
.state(best_match)
.expect("failed to determine resource container state");
resources.insert(best_match.clone());
if !state.is_mounted() {
need_mount.insert(best_match.clone());
}
}
if !need_mount.is_empty() {
info!(
"Mounting {} container(s) for the start of {}",
need_mount.len(),
container
);
for mount in self.mount_all(&Vec::from_iter(need_mount)).await {
if let Err(e) = mount {
warn!("failed to mount: {}", e);
return Err(e);
}
}
}
info!("Creating {}", container);
let stop = CancellationToken::new();
let console_fd = if let Some(contianer_configuration) = manifest.console.clone() {
let peer = Peer::Container(container.clone());
let (runtime_stream, container_stream) =
StdUnixStream::pair().expect("failed to create socketpair");
let container_fd: OwnedFd = container_stream.into();
let runtime = runtime_stream
.set_nonblocking(true)
.and_then(|_| UnixStream::from_std(runtime_stream))
.expect("failed to set socket into nonblocking mode");
let notifications = self.notification_tx.subscribe();
let events_tx = self.events_tx.clone();
let stop = stop.clone();
let container = Some(container.clone());
let options = self
.config
.console
.options
.clone()
.unwrap_or_default()
.into();
let permissions = contianer_configuration.permissions.into();
let connection = Console::connection(
runtime,
peer,
stop,
container,
options,
permissions,
events_tx,
notifications,
None,
);
task::spawn(connection);
Some(container_fd)
} else {
None
};
let ContainerIo { io } = io::open(container, &manifest.io.clone().unwrap_or_default())
.await
.expect("IO setup error");
let (socket_fds, sockets) = sockets::open(
self.config.socket_dir.as_path(),
container,
&manifest.sockets,
)
.await
.expect("Socket setup error");
persistence::setup(&self.config, &manifest).await?;
let config = &self.config;
let containers = self.containers.keys();
let pid = self
.forker
.create(
container,
config,
&manifest,
io,
console_fd,
socket_fds,
containers,
self.selinux_enabled,
)
.await?;
super::debug::start(&self.config, container, pid).await?;
let cgroups = {
let config = manifest.cgroups.clone().unwrap_or_default();
let events_tx = self.events_tx.clone();
cgroups::CGroups::new(&self.config.cgroup, events_tx, container, &config, pid)
.await
.expect("failed to create cgroup")
};
let mut args = Vec::with_capacity(
1 + if args_extra.is_empty() {
manifest.args.len()
} else {
args_extra.len()
},
);
args.push(init.clone());
if !args_extra.is_empty() {
args.extend(args_extra.iter().cloned());
} else {
args.extend(manifest.args.iter().cloned());
};
let env = if env_extra.is_empty() {
&manifest.env
} else {
env_extra
};
let env = env
.iter()
.map(|(k, v)| format!("{k}={v}"))
.chain(once(format!("{}={}", env::CONTAINER, container)))
.chain(once(format!("{}={}", env::NAME, container.name())))
.chain(once(format!("{}={}", env::VERSION, container.version())))
.map(|s| unsafe { NonNulString::from_string_unchecked(s) })
.collect::<Vec<_>>();
debug!("Container {} init is {:?}", container, init);
debug!(
"Container {} argv is \"{}\"",
container,
args.iter().join(" ")
);
debug!(
"Container {} env is \"{}\"",
container,
env.iter().join(", ")
);
if let Err(e) = self.forker.exec(container.clone(), init, args, env).await {
warn!("Failed to exec {} ({}): {}", container, pid, e);
stop.cancel();
cgroups.destroy().await;
return Err(e);
}
let container_state = self.containers.get_mut(container).expect("Internal error");
let started = time::Instant::now();
container_state.process = Some(ContainerContext {
pid,
started,
cgroups,
sockets,
stop,
resources,
});
let duration = start.elapsed().as_secs_f32();
info!("Started {} ({}) in {:.03}s", container, pid, duration);
self.container_event(container, ContainerEvent::Started);
Ok(())
}
pub(super) async fn kill(
&mut self,
container: &Container,
signal: Signal,
) -> Result<(), Error> {
let container_state = self.state_mut(container)?;
match &mut container_state.process {
Some(context) => {
info!("Killing {} with {}", container, signal.as_str());
let pid = context.pid;
let process_group = nix::unistd::Pid::from_raw(-(pid as i32));
match nix::sys::signal::kill(process_group, Some(signal)) {
Ok(_) => Ok(()),
Err(nix::Error::ESRCH) => {
debug!("Process {} already exited", pid);
Ok(())
}
Err(e) => unimplemented!("Kill error {}", e),
}
}
None => Err(Error::StopContainerNotStarted(container.clone())),
}
}
pub(super) async fn shutdown(
mut self,
event_rx: impl Stream<Item = Event>,
) -> Result<(), Error> {
let started_containers = self
.containers
.iter()
.filter_map(|(container, state)| state.process.as_ref().map(|_| container.clone()))
.collect::<Vec<_>>();
for container in &started_containers {
self.kill(container, Signal::SIGKILL).await?;
}
pin!(event_rx);
while self
.containers
.values()
.any(|state| state.process.is_some())
{
if let Some(Event::Container(container, event)) = event_rx.next().await {
self.on_event(&container, &event, true).await?;
}
}
let to_umount = self
.containers
.iter()
.filter(|(_, state)| state.is_mounted())
.map(|(container, _)| container.clone())
.collect::<Vec<_>>();
self.umount_all(&to_umount).await;
Ok(())
}
async fn install(
&mut self,
id: &str,
rx: &mut mpsc::Receiver<Bytes>,
) -> Result<Container, Error> {
let repository = self
.repositories
.get_mut(id)
.ok_or_else(|| Error::InvalidRepository(id.to_string()))?;
let container = repository.insert(rx).await?;
let already_installed = self
.state(&container)
.ok()
.map(|state| state.repository.clone());
if let Some(current_repository) = already_installed {
warn!(
"Skipping duplicate container {} which is already in repository {}",
container, current_repository
);
let repository = self
.repositories
.get_mut(id)
.ok_or_else(|| Error::InvalidRepository(id.to_string()))?;
repository.remove(&container).await?;
return Err(Error::InstallDuplicate(container));
}
self.containers.insert(
container.clone(),
ContainerState {
repository: id.into(),
..Default::default()
},
);
info!("Successfully installed {}", container);
self.container_event(&container, ContainerEvent::Installed);
Ok(container)
}
async fn uninstall(&mut self, container: &Container, wipe: bool) -> Result<(), Error> {
info!("Trying to uninstall {}", container);
let state = self.state(container)?;
let repository = state.repository.clone();
if state.is_mounted() {
self.umount_all(&[container.clone()])
.await
.pop()
.expect("internal error")?;
}
debug!("Removing {} from {}", container, repository);
self.repositories
.get_mut(&repository)
.expect("Internal error")
.remove(container)
.await?;
if wipe {
let name: &str = container.name().as_ref();
let dir = self.config.data_dir.join(name);
if dir.exists() {
info!(
"Wiping persistent data dir {} of {}",
dir.display(),
container
);
if let Err(e) = fs::remove_dir_all(&dir)
.await
.with_context(|| format!("failed to remove {}", dir.display()))
{
warn!("Failed to remove {}: {}", dir.display(), e);
}
}
}
self.containers.remove(container);
info!("Uninstalled {}", container);
self.container_event(container, ContainerEvent::Uninstalled);
Ok(())
}
async fn on_exit(
&mut self,
container: &Container,
exit_status: &ExitStatus,
is_shutdown: bool,
) -> Result<(), Error> {
let autostart = self.manifest(container)?.autostart.clone();
if let Ok(state) = self.state_mut(container) {
if let Some(process) = state.process.take() {
let is_critical = autostart == Some(Autostart::Critical);
let is_critical = is_critical && !is_shutdown;
let duration = process.started.elapsed();
if is_critical {
error!(
"Critical process {} exited after {:?} with status {}",
container, duration, exit_status,
);
} else {
info!(
"Process {} exited after {:?} with status {}",
container, duration, exit_status,
);
}
process.destroy().await;
self.container_event(container, ContainerEvent::Exit(exit_status.clone()));
info!("Container {} exited with status {}", container, exit_status);
if !exit_status.success() && is_critical {
return Err(Error::CriticalContainer(
container.clone(),
exit_status.clone(),
));
}
}
}
Ok(())
}
pub(super) async fn on_event(
&mut self,
container: &Container,
event: &ContainerEvent,
is_shutdown: bool,
) -> Result<(), Error> {
match event {
ContainerEvent::Started => (),
ContainerEvent::Exit(exit_status) => {
self.on_exit(container, exit_status, is_shutdown).await?;
}
ContainerEvent::Installed => (),
ContainerEvent::Uninstalled => (),
ContainerEvent::CGroup(CGroupEvent::Memory(_)) => {
warn!("Process {} is out of memory", container);
}
}
Ok(())
}
pub(super) async fn on_request(
&mut self,
request: Request,
response: oneshot::Sender<model::Response>,
) -> Result<(), Error> {
match request {
Request::Request(ref request) => {
let payload = match request {
model::Request::List => model::Response::List(self.list_containers()),
model::Request::Install { .. } => unreachable!(),
model::Request::Mount { containers } => {
let result = self
.mount_all(containers)
.await
.drain(..)
.zip(containers)
.map(|(r, c)| match r {
Ok(r) => model::MountResult::Ok { container: r },
Err(e) => model::MountResult::Error {
container: c.clone(),
error: e.into(),
},
})
.collect();
model::Response::Mount(result)
}
model::Request::Umount { containers } => {
let result = self
.umount_all(containers)
.await
.drain(..)
.zip(containers)
.map(|(r, c)| match r {
Ok(r) => model::UmountResult::Ok { container: r },
Err(e) => model::UmountResult::Error {
container: c.clone(),
error: e.into(),
},
})
.collect();
model::Response::Umount(result)
}
model::Request::Repositories => {
let repositories = self.repositories.keys().cloned().collect();
model::Response::Repositories(repositories)
}
model::Request::Shutdown => {
self.events_tx
.send(Event::Shutdown)
.await
.expect("Internal channel error on main");
model::Response::Shutdown
}
model::Request::Start {
container,
init,
arguments,
environment,
} => {
let result = match self
.start(container, init.clone(), arguments, environment)
.await
{
Ok(_) => model::StartResult::Ok {
container: container.clone(),
},
Err(e) => {
warn!("failed to start {}: {}", container, e);
model::StartResult::Error {
container: container.clone(),
error: e.into(),
}
}
};
model::Response::Start(result)
}
model::Request::Kill { container, signal } => {
let result = match Signal::try_from(*signal) {
Ok(signal) => match self.kill(container, signal).await {
Ok(_) => model::KillResult::Ok {
container: container.clone(),
},
Err(e) => {
error!("failed to kill {} with {}: {}", container, signal, e);
model::KillResult::Error {
container: container.clone(),
error: e.into(),
}
}
},
Err(e) => {
error!("failed to kill {} with {}: {}", container, signal, e);
let error = model::Error::Unexpected {
error: e.to_string(),
};
model::KillResult::Error {
container: container.clone(),
error,
}
}
};
model::Response::Kill(result)
}
model::Request::Uninstall { container, wipe } => {
let result = match self.uninstall(container, *wipe).await {
Ok(_) => model::UninstallResult::Ok {
container: container.clone(),
},
Err(e) => {
warn!("failed to uninstall {}: {}", container, e);
model::UninstallResult::Error {
container: container.clone(),
error: e.into(),
}
}
};
model::Response::Uninstall(result)
}
model::Request::Inspect { container } => match self.inspect(container) {
Ok(data) => model::Response::Inspect(model::InspectResult::Ok {
container: container.clone(),
data: Box::new(data),
}),
Err(e) => model::Response::Inspect(model::InspectResult::Error {
container: container.clone(),
error: e.into(),
}),
},
model::Request::Ident => unreachable!(), model::Request::TokenCreate { .. } => unreachable!(), model::Request::TokenVerify { .. } => unreachable!(), };
response.send(payload).ok();
}
Request::Install(repository, mut rx) => {
let payload = match self.install(&repository, &mut rx).await {
Ok(container) => {
model::Response::Install(model::InstallResult::Ok { container })
}
Err(e) => {
model::Response::Install(model::InstallResult::Error { error: e.into() })
}
};
response.send(payload).ok();
}
}
Ok(())
}
async fn mount_all(&mut self, containers: &[Container]) -> Vec<Result<Container, Error>> {
let start = time::Instant::now();
let mut mounts = Vec::with_capacity(containers.len());
for container in containers {
match self.state(container) {
Ok(state) if state.is_mounted() => {
mounts.push(Either::Left(ready(Err(Error::InvalidContainer(
container.clone(),
)))));
}
Ok(_) => mounts.push(Either::Right(self.mount(container).map_err(|e| e.into()))),
Err(_) => {
mounts.push(Either::Left(ready(Err(Error::InvalidContainer(
container.clone(),
)))));
}
}
}
let mut result = Vec::with_capacity(containers.len());
for (container, mount_result) in containers.iter().zip(join_all(mounts).await) {
match mount_result {
Ok(root) => {
let state = self.state_mut(container).expect("Internal error");
state.root = Some(root);
info!("Mounted {container}");
result.push(Ok(container.clone()));
}
Err(e) => {
warn!("Failed to mount {}: {}", container, e);
result.push(Err(e));
}
}
}
if result.iter().any(|e| e.is_err()) {
warn!("Mount operation failed");
} else {
info!(
"Successfully mounted {} container(s) in {:?}",
result.len(),
start.elapsed()
);
}
result
}
async fn umount_all(&mut self, containers: &[Container]) -> Vec<Result<Container, Error>> {
let start = time::Instant::now();
let mut mounts = Vec::with_capacity(containers.len());
'outer: for umount_container in containers {
let (container_state, manifest) = if let Ok((state, manifest)) =
self.state(umount_container).and_then(|state| {
self.manifest(umount_container)
.map(|manifest| (state, manifest))
}) {
(state, manifest)
} else {
let error = Err(Error::InvalidContainer(umount_container.clone()));
mounts.push(Either::Right(ready(error)));
continue;
};
if !container_state.is_mounted() {
let error = Err(Error::UmountBusy(umount_container.clone()));
mounts.push(Either::Right(ready(error)));
continue;
}
if container_state.process.is_some() {
let error = Err(Error::UmountBusy(umount_container.clone()));
mounts.push(Either::Right(ready(error)));
continue;
}
if manifest.init.is_none() {
for (running_container, state) in &self.containers {
if state.process.is_none() {
continue;
}
let manifest = self.manifest(running_container).expect("Internal error");
if manifest.init.is_none() {
continue;
}
for mount in &manifest.mounts {
if let Mount::Resource(Resource { name, version, .. }) = mount.1 {
if State::match_container(name, version, self.containers.keys())
.filter(|resource| &umount_container == resource)
.is_some()
{
warn!(
"Resource container {} is used by {}",
umount_container, running_container
);
let error = Err(Error::UmountBusy(running_container.clone()));
mounts.push(Either::Right(ready(error)));
continue 'outer;
}
}
}
}
}
mounts.push(Either::Left(self.umount(umount_container)));
}
debug_assert_eq!(mounts.len(), containers.len());
let mut result = Vec::with_capacity(containers.len());
for (container, mount_result) in containers.iter().zip(join_all(mounts).await) {
match mount_result {
Ok(_) => {
let state = self.state_mut(container).expect("Internal error");
state.root = None;
info!("Umounted {}", container);
result.push(Ok(container.clone()));
}
Err(e) => {
warn!("failed to umount {}: {}", container, e);
result.push(Err(e));
}
}
}
let duration = start.elapsed();
if result.iter().any(|e| e.is_err()) {
warn!("Umount operation failed after {duration:?}",);
} else {
let containers = result.len();
info!("Successfully umounted {containers} container(s) in {duration:?}",);
}
result
}
pub fn match_container<'a, I: Iterator<Item = &'a Container>>(
name: &Name,
version_req: &VersionReq,
containers: I,
) -> Option<&'a Container> {
containers
.filter(|c| c.name() == name && version_req.matches(c.version()))
.sorted_by(|c1, c2| c1.version().cmp(c2.version()))
.next()
}
fn inspect(&self, container: &Container) -> Result<api::model::ContainerData, Error> {
let state = self
.containers
.get(container)
.ok_or_else(|| Error::InvalidContainer(container.clone()))?;
let manifest = self.manifest(container)?.clone();
let runtime_info = state.process.as_ref();
let process = runtime_info.map(|context| api::model::Process {
pid: context.pid,
uptime: context.started.elapsed().as_nanos() as u64,
statistics: context.cgroups.stats(),
});
let repository = state.repository.clone();
let mounted = state.is_mounted();
Ok(api::model::ContainerData {
manifest,
repository,
mounted,
process,
})
}
fn list_containers(&self) -> Vec<api::model::Container> {
self.containers.keys().cloned().collect()
}
fn container_event(&self, container: &Container, event: ContainerEvent) {
if self.notification_tx.receiver_count() > 0 {
self.notification_tx.send((container.clone(), event)).ok();
}
}
fn state(&self, container: &Container) -> Result<&ContainerState, Error> {
self.containers
.get(container)
.ok_or_else(|| Error::InvalidContainer(container.clone()))
}
fn state_mut(&mut self, container: &Container) -> Result<&mut ContainerState, Error> {
self.containers
.get_mut(container)
.ok_or_else(|| Error::InvalidContainer(container.clone()))
}
fn npk(&self, container: &Container) -> Result<&Npk, Error> {
let state = self.state(container)?;
Ok(self
.repository(&state.repository)?
.get(container)
.expect("container has invalid repository reference"))
}
fn manifest(&self, container: &Container) -> Result<&Manifest, Error> {
self.npk(container).map(|npk| npk.manifest())
}
fn repository(&self, repository: &str) -> Result<&Repository, Error> {
self.repositories
.get(repository)
.ok_or_else(|| Error::InvalidRepository(repository.into()))
}
}
fn is_selinux_enabled() -> bool {
let enabled = Path::new("/sys/fs/selinux/enforce").exists();
debug!(
"SELinux is {}",
if enabled { "enabled" } else { "disabled" }
);
enabled
}
#[test]
#[allow(clippy::unwrap_used)]
fn find_newest_resource() {
use std::str::FromStr;
let old = Container::try_from("test:0.0.1").unwrap();
let new = Container::try_from("test:0.0.2").unwrap();
let other = Container::try_from("other:1.0.0").unwrap();
let containers = [old, new.clone(), other];
let resource = State::match_container(
&Name::try_from("test").unwrap(),
&VersionReq::from_str(">=0.0.2").unwrap(),
&mut containers.iter(),
);
assert!(resource.is_some());
assert_eq!(resource.unwrap(), &new);
}
#[test]
#[allow(clippy::unwrap_used)]
fn cannot_find_newer_resource() {
use std::str::FromStr;
let old = Container::try_from("test:0.0.1").unwrap();
let new = Container::try_from("test:0.0.2").unwrap();
let other = Container::try_from("other:1.0.0").unwrap();
let containers = [old, new, other];
let resource = State::match_container(
&Name::try_from("test").unwrap(),
&VersionReq::from_str(">=0.0.3").unwrap(),
&mut containers.iter(),
);
assert!(resource.is_none());
}