use self::fork::Streams;
use crate::{api, api::model::Container, runtime::ipc::AsyncFramedUnixStream};
use async_stream::stream;
use config::Config;
use console::{Configuration, Permissions};
use futures::{
future::{ready, Either},
FutureExt, StreamExt,
};
use log::{debug, info};
use nix::{
libc::{EXIT_FAILURE, EXIT_SUCCESS},
sys::{
self,
signal::Signal,
wait::{waitpid, WaitStatus},
},
unistd,
};
use serde::{Deserialize, Serialize};
use state::State;
use std::{convert::TryFrom, future::Future, path::Path};
use sync::mpsc;
use thiserror::Error;
use tokio::{
pin, select,
sync::{self, broadcast, oneshot},
task::{self, JoinHandle},
};
use tokio_util::sync::{CancellationToken, DropGuard};
mod cgroups;
mod console;
mod debug;
mod error;
mod fork;
mod io;
mod ipc;
mod key;
mod mount;
mod repository;
mod state;
mod stats;
mod token;
pub mod config;
type EventTx = mpsc::Sender<Event>;
type NotificationTx = broadcast::Sender<(Container, ContainerEvent)>;
type RepositoryId = String;
type ExitCode = i32;
type Pid = u32;
const ENV_NAME: &str = "NORTHSTAR_NAME";
const ENV_VERSION: &str = "NORTHSTAR_VERSION";
const ENV_CONTAINER: &str = "NORTHSTAR_CONTAINER";
const ENV_CONSOLE: &str = "NORTHSTAR_CONSOLE";
#[derive(Debug)]
enum Event {
Console(console::Request, oneshot::Sender<api::model::Response>),
Shutdown,
Container(Container, ContainerEvent),
}
#[derive(Clone, Debug)]
enum ContainerEvent {
Started,
Exit(ExitStatus),
Installed,
Uninstalled,
CGroup(CGroupEvent),
}
#[derive(Clone, Debug)]
enum CGroupEvent {
Memory(MemoryEvent),
}
#[derive(Clone, Default, Debug)]
struct MemoryEvent {
low: Option<u64>,
high: Option<u64>,
max: Option<u64>,
oom: Option<u64>,
oom_kill: Option<u64>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ExitStatus {
Exit(ExitCode),
Signalled(u8),
}
impl From<Signal> for ExitStatus {
fn from(signal: Signal) -> Self {
ExitStatus::Signalled(signal as u8)
}
}
impl From<ExitCode> for ExitStatus {
fn from(code: ExitCode) -> Self {
ExitStatus::Exit(code)
}
}
impl ExitStatus {
pub const SUCCESS: ExitCode = EXIT_SUCCESS;
pub const FAILURE: ExitCode = EXIT_FAILURE;
pub fn success(&self) -> bool {
matches!(self, ExitStatus::Exit(code) if *code == Self::SUCCESS)
}
}
impl std::fmt::Display for ExitStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ExitStatus::Exit(code) => write!(f, "Exit({})", code),
ExitStatus::Signalled(signal) => match sys::signal::Signal::try_from(*signal as i32) {
Ok(signal) => write!(f, "Signalled({})", signal),
Err(_) => write!(f, "Signalled({})", signal),
},
}
}
}
#[derive(Error, Debug)]
#[error(transparent)]
pub struct Error(#[from] anyhow::Error);
#[allow(clippy::large_enum_variant)]
pub enum Runtime {
Created {
config: Config,
forker: (Pid, Streams),
},
Running {
guard: DropGuard,
task: JoinHandle<anyhow::Result<()>>,
},
}
impl Runtime {
pub fn new(config: Config) -> Result<Runtime, Error> {
config.check()?;
let forker = fork::start()?;
Ok(Runtime::Created { config, forker })
}
pub async fn start(self) -> Result<Runtime, Error> {
let (config, forker) = if let Runtime::Created { config, forker } = self {
(config, forker)
} else {
panic!("Runtime::start called on a running runtime");
};
let token = CancellationToken::new();
let guard = token.clone().drop_guard();
let task = task::spawn(run(config, token, forker));
Ok(Runtime::Running { guard, task })
}
pub fn shutdown(self) -> impl Future<Output = Result<(), Error>> {
if let Runtime::Running { guard, task } = self {
drop(guard);
Either::Left({
task.then(|n| match n {
Ok(n) => ready(n.map_err(|e| e.into())),
Err(_) => ready(Ok(())),
})
})
} else {
Either::Right(ready(Ok(())))
}
}
pub async fn stopped(&mut self) -> Result<(), Error> {
match self {
Runtime::Running { ref mut task, .. } => match task.await {
Ok(r) => r.map_err(|e| e.into()),
Err(_) => Ok(()),
},
Runtime::Created { .. } => panic!("Stopped called on a stopped runtime"),
}
}
}
async fn run(
config: Config,
token: CancellationToken,
forker: (Pid, Streams),
) -> anyhow::Result<()> {
let cgroup = Path::new(config.cgroup.as_str()).to_owned();
cgroups::init(&cgroup).await?;
let (forker_pid, forker_channels) = forker;
let mut join_forker = task::spawn_blocking(move || {
let pid = unistd::Pid::from_raw(forker_pid as i32);
loop {
match waitpid(Some(pid), None) {
Ok(WaitStatus::Exited(_pid, status)) => {
break ExitStatus::Exit(status);
}
Ok(WaitStatus::Signaled(_pid, status, _)) => {
break ExitStatus::Signalled(status as u8);
}
Ok(WaitStatus::Continued(_)) | Ok(WaitStatus::Stopped(_, _)) => (),
Err(nix::Error::EINTR) => (),
e => panic!("failed to waitpid on {}: {:?}", pid, e),
}
}
});
let (event_tx, mut event_rx) = mpsc::channel::<Event>(config.event_buffer_size);
let (notification_tx, _) = sync::broadcast::channel(config.notification_buffer_size);
let console = if let Some(url) = config.debug.as_ref().map(|d| &d.console) {
let mut console = console::Console::new(event_tx.clone(), notification_tx.clone());
let configuration = Configuration {
permissions: Permissions::full(),
..Default::default()
};
console
.listen(url, &configuration, config.token_validity)
.await?;
Some(console)
} else {
None
};
let Streams {
command_stream,
socket_stream,
notification_stream,
} = forker_channels;
let forker = fork::Forker::new(command_stream, socket_stream);
let event_rx = stream! {
let mut exit_notifications = AsyncFramedUnixStream::new(notification_stream);
loop {
select! {
Some(event) = event_rx.recv() => yield event,
Ok(Some(fork::Notification::Exit { container, exit_status })) = exit_notifications.recv() => {
let event = ContainerEvent::Exit(exit_status);
yield Event::Container(container, event);
}
else => unimplemented!(),
}
}
};
pin!(event_rx);
let mut state = State::new(config, event_tx.clone(), notification_tx, forker).await?;
info!("Runtime up and running");
loop {
tokio::select! {
_ = token.cancelled() => event_tx.send(Event::Shutdown).await.expect("failed to send shutdown event"),
event = event_rx.next() => {
if let Err(e) = match event.expect("internal error") {
Event::Console(request, response) => state.on_request(request, response).await,
Event::Shutdown => {
debug!("Shutting down Northstar runtime");
if let Some(console) = console {
debug!("Shutting down console");
console.shutdown().await?;
}
break state.shutdown(event_rx).await;
}
Event::Container(container, event) => state.on_event(&container, &event, false).await,
} {
break Err(e);
}
}
exit_status = &mut join_forker => panic!("Forker exited with {:?}", exit_status),
}
}?;
debug!("Joining forker with pid {}", forker_pid);
join_forker.await.expect("failed to join forker");
info!("Shutting down cgroups");
cgroups::shutdown(&cgroup)
.await
.expect("failed to shutdown cgroups");
info!("Shutdown complete");
Ok(())
}