northstar_runtime/runtime/
runtime.rs1use crate::{
2 api::model::Container,
3 runtime::{
4 cgroups,
5 config::Config,
6 console,
7 events::{ContainerEvent, Event},
8 exit_status::ExitStatus,
9 fork,
10 fork::Streams,
11 ipc::AsyncFramedUnixStream,
12 state::State,
13 },
14};
15use async_stream::stream;
16use futures::{
17 future::{ready, Either},
18 FutureExt, StreamExt,
19};
20use log::{debug, info};
21use nix::{
22 sys::wait::{waitpid, WaitStatus},
23 unistd,
24};
25use std::{future::Future, path::Path};
26use sync::mpsc;
27use thiserror::Error;
28use tokio::{
29 pin, select,
30 sync::{self, broadcast},
31 task::{self, JoinHandle},
32};
33use tokio_util::sync::{CancellationToken, DropGuard};
34
35pub(crate) type NotificationTx = broadcast::Sender<(Container, ContainerEvent)>;
36pub(crate) type Pid = u32;
37
38#[derive(Error, Debug)]
40#[error(transparent)]
41pub struct Error(#[from] anyhow::Error);
42
43#[allow(clippy::large_enum_variant)]
45pub enum Runtime {
46 Created {
48 config: Config,
50 forker: (Pid, Streams),
52 },
53 Running {
55 guard: DropGuard,
57 task: JoinHandle<anyhow::Result<()>>,
59 },
60}
61
62impl Runtime {
63 pub fn new(config: Config) -> Result<Runtime, Error> {
65 config.check()?;
66 let forker = fork::start()?;
67 Ok(Runtime::Created { config, forker })
68 }
69
70 pub async fn start(self) -> Result<Runtime, Error> {
72 let (config, forker) = if let Runtime::Created { config, forker } = self {
73 (config, forker)
74 } else {
75 panic!("Runtime::start called on a running runtime");
76 };
77
78 let token = CancellationToken::new();
79 let guard = token.clone().drop_guard();
80
81 let task = task::spawn(run(config, token, forker));
83
84 Ok(Runtime::Running { guard, task })
85 }
86
87 pub fn shutdown(self) -> impl Future<Output = Result<(), Error>> {
89 if let Runtime::Running { guard, task } = self {
90 drop(guard);
91 Either::Left({
92 task.then(|n| match n {
93 Ok(n) => ready(n.map_err(|e| e.into())),
94 Err(_) => ready(Ok(())),
95 })
96 })
97 } else {
98 Either::Right(ready(Ok(())))
99 }
100 }
101
102 pub async fn stopped(&mut self) -> Result<(), Error> {
104 match self {
105 Runtime::Running { ref mut task, .. } => match task.await {
106 Ok(r) => r.map_err(|e| e.into()),
107 Err(_) => Ok(()),
108 },
109 Runtime::Created { .. } => panic!("Stopped called on a stopped runtime"),
110 }
111 }
112}
113
114async fn run(
116 mut config: Config,
117 token: CancellationToken,
118 forker: (Pid, Streams),
119) -> anyhow::Result<()> {
120 let cgroup = Path::new(config.cgroup.as_str()).to_owned();
122 cgroups::init(&cgroup).await?;
123
124 let (forker_pid, forker_channels) = forker;
126 let mut join_forker = task::spawn_blocking(move || {
127 let pid = unistd::Pid::from_raw(forker_pid as i32);
128 loop {
129 match waitpid(Some(pid), None) {
130 Ok(WaitStatus::Exited(_pid, status)) => {
131 break ExitStatus::Exit(status);
132 }
133 Ok(WaitStatus::Signaled(_pid, status, _)) => {
134 break ExitStatus::Signalled(status as u8);
135 }
136 Ok(WaitStatus::Continued(_)) | Ok(WaitStatus::Stopped(_, _)) => (),
137 Err(nix::Error::EINTR) => (),
138 e => panic!("failed to waitpid on {pid}: {e:?}"),
139 }
140 }
141 });
142
143 let (event_tx, mut event_rx) = mpsc::channel::<Event>(config.event_buffer_size);
145 let (notification_tx, _) = sync::broadcast::channel(config.notification_buffer_size);
146
147 let console = if let Some(global) = config.console.global.take() {
149 let mut console = console::Console::new(event_tx.clone(), notification_tx.clone());
150 let options = global.options.unwrap_or_default();
151 let permissions = global.permissions;
152 console
153 .listen(&global.bind, options.into(), permissions.into())
154 .await?;
155 Some(console)
156 } else {
157 None
158 };
159
160 let Streams {
163 command_stream,
164 socket_stream,
165 notification_stream,
166 } = forker_channels;
167 let forker = fork::Forker::new(command_stream, socket_stream);
168
169 let event_rx = stream! {
171 let mut exit_notifications = AsyncFramedUnixStream::new(notification_stream);
172 loop {
173 select! {
174 Some(event) = event_rx.recv() => yield event,
175 Ok(Some(fork::Notification::Exit { container, exit_status })) = exit_notifications.recv() => {
176 let event = ContainerEvent::Exit(exit_status);
177 yield Event::Container(container, event);
178 }
179 else => unimplemented!(),
180 }
181 }
182 };
183 pin!(event_rx);
184
185 let mut state = State::new(config, event_tx.clone(), notification_tx, forker).await?;
186
187 info!("Runtime up and running");
188
189 loop {
191 tokio::select! {
192 _ = token.cancelled() => event_tx.send(Event::Shutdown).await.expect("failed to send shutdown event"),
194 event = event_rx.next() => {
196 if let Err(e) = match event.expect("internal error") {
197 Event::Console(request, response) => state.on_request(request, response).await,
199 Event::Shutdown => {
201 debug!("Shutting down Northstar runtime");
202 if let Some(console) = console {
203 debug!("Shutting down console");
204 console.shutdown().await?;
205 }
206 break state.shutdown(event_rx).await;
207 }
208 Event::Container(container, event) => state.on_event(&container, &event, false).await,
210 } {
211 break Err(e);
212 }
213 }
214 exit_status = &mut join_forker => panic!("Forker exited with {exit_status:?}"),
215 }
216 }?;
217
218 debug!("Joining forker with pid {}", forker_pid);
220 join_forker.await.expect("failed to join forker");
221
222 info!("Shutting down cgroups");
223 cgroups::shutdown(&cgroup)
224 .await
225 .expect("failed to shutdown cgroups");
226
227 info!("Shutdown complete");
228
229 Ok(())
230}