northstar_runtime/runtime/
runtime.rs

1use crate::{
2    api::model::Container,
3    runtime::{
4        cgroups,
5        config::Config,
6        console,
7        events::{ContainerEvent, Event},
8        exit_status::ExitStatus,
9        fork,
10        fork::Streams,
11        ipc::AsyncFramedUnixStream,
12        state::State,
13    },
14};
15use async_stream::stream;
16use futures::{
17    future::{ready, Either},
18    FutureExt, StreamExt,
19};
20use log::{debug, info};
21use nix::{
22    sys::wait::{waitpid, WaitStatus},
23    unistd,
24};
25use std::{future::Future, path::Path};
26use sync::mpsc;
27use thiserror::Error;
28use tokio::{
29    pin, select,
30    sync::{self, broadcast},
31    task::{self, JoinHandle},
32};
33use tokio_util::sync::{CancellationToken, DropGuard};
34
35pub(crate) type NotificationTx = broadcast::Sender<(Container, ContainerEvent)>;
36pub(crate) type Pid = u32;
37
38/// Runtime error
39#[derive(Error, Debug)]
40#[error(transparent)]
41pub struct Error(#[from] anyhow::Error);
42
43/// Runtime handle
44#[allow(clippy::large_enum_variant)]
45pub enum Runtime {
46    /// The runtime is created but not yet started.
47    Created {
48        /// Runtime configuration
49        config: Config,
50        /// Forker pid and streams
51        forker: (Pid, Streams),
52    },
53    /// The runtime is started.
54    Running {
55        /// Drop guard to stop the runtime
56        guard: DropGuard,
57        /// Runtime task
58        task: JoinHandle<anyhow::Result<()>>,
59    },
60}
61
62impl Runtime {
63    /// Create new runtime instance
64    pub fn new(config: Config) -> Result<Runtime, Error> {
65        config.check()?;
66        let forker = fork::start()?;
67        Ok(Runtime::Created { config, forker })
68    }
69
70    /// Start runtime with configuration `config`
71    pub async fn start(self) -> Result<Runtime, Error> {
72        let (config, forker) = if let Runtime::Created { config, forker } = self {
73            (config, forker)
74        } else {
75            panic!("Runtime::start called on a running runtime");
76        };
77
78        let token = CancellationToken::new();
79        let guard = token.clone().drop_guard();
80
81        // Start a task that drives the main loop and wait for shutdown results
82        let task = task::spawn(run(config, token, forker));
83
84        Ok(Runtime::Running { guard, task })
85    }
86
87    /// Stop the runtime and wait for the termination
88    pub fn shutdown(self) -> impl Future<Output = Result<(), Error>> {
89        if let Runtime::Running { guard, task } = self {
90            drop(guard);
91            Either::Left({
92                task.then(|n| match n {
93                    Ok(n) => ready(n.map_err(|e| e.into())),
94                    Err(_) => ready(Ok(())),
95                })
96            })
97        } else {
98            Either::Right(ready(Ok(())))
99        }
100    }
101
102    /// Wait for the runtime to stop
103    pub async fn stopped(&mut self) -> Result<(), Error> {
104        match self {
105            Runtime::Running { ref mut task, .. } => match task.await {
106                Ok(r) => r.map_err(|e| e.into()),
107                Err(_) => Ok(()),
108            },
109            Runtime::Created { .. } => panic!("Stopped called on a stopped runtime"),
110        }
111    }
112}
113
114/// Main loop
115async fn run(
116    mut config: Config,
117    token: CancellationToken,
118    forker: (Pid, Streams),
119) -> anyhow::Result<()> {
120    // Setup root cgroup(s)
121    let cgroup = Path::new(config.cgroup.as_str()).to_owned();
122    cgroups::init(&cgroup).await?;
123
124    // Join forker
125    let (forker_pid, forker_channels) = forker;
126    let mut join_forker = task::spawn_blocking(move || {
127        let pid = unistd::Pid::from_raw(forker_pid as i32);
128        loop {
129            match waitpid(Some(pid), None) {
130                Ok(WaitStatus::Exited(_pid, status)) => {
131                    break ExitStatus::Exit(status);
132                }
133                Ok(WaitStatus::Signaled(_pid, status, _)) => {
134                    break ExitStatus::Signalled(status as u8);
135                }
136                Ok(WaitStatus::Continued(_)) | Ok(WaitStatus::Stopped(_, _)) => (),
137                Err(nix::Error::EINTR) => (),
138                e => panic!("failed to waitpid on {pid}: {e:?}"),
139            }
140        }
141    });
142
143    // Northstar runs in a event loop
144    let (event_tx, mut event_rx) = mpsc::channel::<Event>(config.event_buffer_size);
145    let (notification_tx, _) = sync::broadcast::channel(config.notification_buffer_size);
146
147    // Initialize the console if bind address configured.
148    let console = if let Some(global) = config.console.global.take() {
149        let mut console = console::Console::new(event_tx.clone(), notification_tx.clone());
150        let options = global.options.unwrap_or_default();
151        let permissions = global.permissions;
152        console
153            .listen(&global.bind, options.into(), permissions.into())
154            .await?;
155        Some(console)
156    } else {
157        None
158    };
159
160    // Destructure the forker stream handle: Merge the exit notification into the main channel
161    // and create a handle to the foker process to be used in the state module;
162    let Streams {
163        command_stream,
164        socket_stream,
165        notification_stream,
166    } = forker_channels;
167    let forker = fork::Forker::new(command_stream, socket_stream);
168
169    // Merge the exit notification from the forker process with other events into the main loop channel
170    let event_rx = stream! {
171        let mut exit_notifications = AsyncFramedUnixStream::new(notification_stream);
172        loop {
173            select! {
174                Some(event) = event_rx.recv() => yield event,
175                Ok(Some(fork::Notification::Exit { container, exit_status })) = exit_notifications.recv() => {
176                    let event = ContainerEvent::Exit(exit_status);
177                    yield Event::Container(container, event);
178                }
179                else => unimplemented!(),
180            }
181        }
182    };
183    pin!(event_rx);
184
185    let mut state = State::new(config, event_tx.clone(), notification_tx, forker).await?;
186
187    info!("Runtime up and running");
188
189    // Enter main loop
190    loop {
191        tokio::select! {
192            // External shutdown event via the token
193            _ = token.cancelled() => event_tx.send(Event::Shutdown).await.expect("failed to send shutdown event"),
194            // Process events
195            event = event_rx.next() => {
196                if let Err(e) = match event.expect("internal error") {
197                    // Process console events enqueued by console::Console
198                    Event::Console(request, response) => state.on_request(request, response).await,
199                    // The runtime os commanded to shut down and exit.
200                    Event::Shutdown => {
201                        debug!("Shutting down Northstar runtime");
202                        if let Some(console) = console {
203                            debug!("Shutting down console");
204                            console.shutdown().await?;
205                        }
206                        break state.shutdown(event_rx).await;
207                    }
208                    // Container event
209                    Event::Container(container, event) => state.on_event(&container, &event, false).await,
210                } {
211                    break Err(e);
212                }
213            }
214            exit_status = &mut join_forker => panic!("Forker exited with {exit_status:?}"),
215        }
216    }?;
217
218    // Terminate forker process
219    debug!("Joining forker with pid {}", forker_pid);
220    join_forker.await.expect("failed to join forker");
221
222    info!("Shutting down cgroups");
223    cgroups::shutdown(&cgroup)
224        .await
225        .expect("failed to shutdown cgroups");
226
227    info!("Shutdown complete");
228
229    Ok(())
230}