use crate::{
api::model::Container,
runtime::{
cgroups,
config::Config,
console,
events::{ContainerEvent, Event},
exit_status::ExitStatus,
fork,
fork::Streams,
ipc::AsyncFramedUnixStream,
state::State,
},
};
use async_stream::stream;
use futures::{
future::{ready, Either},
FutureExt, StreamExt,
};
use log::{debug, info};
use nix::{
sys::wait::{waitpid, WaitStatus},
unistd,
};
use std::{future::Future, path::Path};
use sync::mpsc;
use thiserror::Error;
use tokio::{
pin, select,
sync::{self, broadcast},
task::{self, JoinHandle},
};
use tokio_util::sync::{CancellationToken, DropGuard};
pub(crate) type NotificationTx = broadcast::Sender<(Container, ContainerEvent)>;
pub(crate) type Pid = u32;
#[derive(Error, Debug)]
#[error(transparent)]
pub struct Error(#[from] anyhow::Error);
#[allow(clippy::large_enum_variant)]
pub enum Runtime {
Created {
config: Config,
forker: (Pid, Streams),
},
Running {
guard: DropGuard,
task: JoinHandle<anyhow::Result<()>>,
},
}
impl Runtime {
pub fn new(config: Config) -> Result<Runtime, Error> {
config.check()?;
let forker = fork::start()?;
Ok(Runtime::Created { config, forker })
}
pub async fn start(self) -> Result<Runtime, Error> {
let (config, forker) = if let Runtime::Created { config, forker } = self {
(config, forker)
} else {
panic!("Runtime::start called on a running runtime");
};
let token = CancellationToken::new();
let guard = token.clone().drop_guard();
let task = task::spawn(run(config, token, forker));
Ok(Runtime::Running { guard, task })
}
pub fn shutdown(self) -> impl Future<Output = Result<(), Error>> {
if let Runtime::Running { guard, task } = self {
drop(guard);
Either::Left({
task.then(|n| match n {
Ok(n) => ready(n.map_err(|e| e.into())),
Err(_) => ready(Ok(())),
})
})
} else {
Either::Right(ready(Ok(())))
}
}
pub async fn stopped(&mut self) -> Result<(), Error> {
match self {
Runtime::Running { ref mut task, .. } => match task.await {
Ok(r) => r.map_err(|e| e.into()),
Err(_) => Ok(()),
},
Runtime::Created { .. } => panic!("Stopped called on a stopped runtime"),
}
}
}
async fn run(
mut config: Config,
token: CancellationToken,
forker: (Pid, Streams),
) -> anyhow::Result<()> {
let cgroup = Path::new(config.cgroup.as_str()).to_owned();
cgroups::init(&cgroup).await?;
let (forker_pid, forker_channels) = forker;
let mut join_forker = task::spawn_blocking(move || {
let pid = unistd::Pid::from_raw(forker_pid as i32);
loop {
match waitpid(Some(pid), None) {
Ok(WaitStatus::Exited(_pid, status)) => {
break ExitStatus::Exit(status);
}
Ok(WaitStatus::Signaled(_pid, status, _)) => {
break ExitStatus::Signalled(status as u8);
}
Ok(WaitStatus::Continued(_)) | Ok(WaitStatus::Stopped(_, _)) => (),
Err(nix::Error::EINTR) => (),
e => panic!("failed to waitpid on {pid}: {e:?}"),
}
}
});
let (event_tx, mut event_rx) = mpsc::channel::<Event>(config.event_buffer_size);
let (notification_tx, _) = sync::broadcast::channel(config.notification_buffer_size);
let console = if let Some(global) = config.console.global.take() {
let mut console = console::Console::new(event_tx.clone(), notification_tx.clone());
let options = global.options.unwrap_or_default();
let permissions = global.permissions;
console
.listen(&global.bind, options.into(), permissions.into())
.await?;
Some(console)
} else {
None
};
let Streams {
command_stream,
socket_stream,
notification_stream,
} = forker_channels;
let forker = fork::Forker::new(command_stream, socket_stream);
let event_rx = stream! {
let mut exit_notifications = AsyncFramedUnixStream::new(notification_stream);
loop {
select! {
Some(event) = event_rx.recv() => yield event,
Ok(Some(fork::Notification::Exit { container, exit_status })) = exit_notifications.recv() => {
let event = ContainerEvent::Exit(exit_status);
yield Event::Container(container, event);
}
else => unimplemented!(),
}
}
};
pin!(event_rx);
let mut state = State::new(config, event_tx.clone(), notification_tx, forker).await?;
info!("Runtime up and running");
loop {
tokio::select! {
_ = token.cancelled() => event_tx.send(Event::Shutdown).await.expect("failed to send shutdown event"),
event = event_rx.next() => {
if let Err(e) = match event.expect("internal error") {
Event::Console(request, response) => state.on_request(request, response).await,
Event::Shutdown => {
debug!("Shutting down Northstar runtime");
if let Some(console) = console {
debug!("Shutting down console");
console.shutdown().await?;
}
break state.shutdown(event_rx).await;
}
Event::Container(container, event) => state.on_event(&container, &event, false).await,
} {
break Err(e);
}
}
exit_status = &mut join_forker => panic!("Forker exited with {exit_status:?}"),
}
}?;
debug!("Joining forker with pid {}", forker_pid);
join_forker.await.expect("failed to join forker");
info!("Shutting down cgroups");
cgroups::shutdown(&cgroup)
.await
.expect("failed to shutdown cgroups");
info!("Shutdown complete");
Ok(())
}