northstar_runtime/runtime/console/
mod.rs

1use crate::{
2    api::{self, codec::Framed, VERSION},
3    common::container::Container,
4    runtime::{
5        events::{CGroupEvent, ContainerEvent, Event, EventTx},
6        exit_status::ExitStatus,
7        repository::RepositoryId,
8        runtime::NotificationTx,
9        token::Token,
10    },
11};
12use anyhow::{bail, Context, Result};
13use api::model;
14use async_stream::stream;
15use bytes::{Buf, Bytes};
16use futures::{
17    future::join_all,
18    stream::{self, FuturesUnordered},
19    Stream, StreamExt,
20};
21use listener::Listener;
22use log::{debug, info, trace, warn};
23use semver::Comparator;
24use std::{cmp::min, fmt, path::Path, unreachable};
25use tokio::{
26    io::{self, AsyncRead, AsyncReadExt, AsyncWrite},
27    pin, select,
28    sync::{broadcast, mpsc, oneshot},
29    task, time,
30};
31use tokio_util::{either::Either, io::ReaderStream, sync::CancellationToken};
32use url::Url;
33
34pub use options::Options;
35use permissions::Permission;
36pub use permissions::Permissions;
37
38mod listener;
39mod options;
40mod permissions;
41mod throttle;
42
43// Request from the main loop to the console
44#[derive(Debug)]
45pub(crate) enum Request {
46    Request(model::Request),
47    Install(RepositoryId, mpsc::Receiver<Bytes>),
48}
49
50/// A console is responsible for monitoring and serving incoming client connections
51/// It feeds relevant events back to the runtime and forwards responses and notifications
52/// to connected clients
53pub(crate) struct Console {
54    /// Tx handle to the main loop
55    event_tx: EventTx,
56    /// Broadcast channel passed to connections to forward notifications
57    notification_tx: NotificationTx,
58    /// Shutdown the console by canceling this token
59    stop: CancellationToken,
60    /// Listener tasks. Currently there's just one task but when the console
61    /// is exposed to containers via unix sockets this list will grow
62    tasks: Vec<task::JoinHandle<()>>,
63}
64
65impl Console {
66    /// Construct a new console instance
67    pub(super) fn new(event_tx: EventTx, notification_tx: NotificationTx) -> Console {
68        Self {
69            event_tx,
70            notification_tx,
71            stop: CancellationToken::new(),
72            tasks: Vec::new(),
73        }
74    }
75
76    /// Spawn a task that listens on `url` for new connections. Spawn a task for
77    /// each client
78    pub(super) async fn listen(
79        &mut self,
80        url: &Url,
81        options: Options,
82        permissions: Permissions,
83    ) -> Result<()> {
84        let event_tx = self.event_tx.clone();
85        let notification_tx = self.notification_tx.clone();
86        // Stop token for self *and* the connections
87        let stop = self.stop.clone();
88
89        debug!("Starting console on {url} with permissions \"{permissions}\"",);
90        let listener = Listener::new(url)
91            .await
92            .context("failed to start console listener")?;
93
94        let task = match listener {
95            Listener::Tcp(listener) => task::spawn(async move {
96                serve(
97                    Box::pin(stream! { loop { yield listener.accept().await; } }),
98                    event_tx,
99                    notification_tx,
100                    stop,
101                    options,
102                    permissions,
103                )
104                .await
105            }),
106            Listener::Unix(listener) => task::spawn(async move {
107                serve(
108                    Box::pin(stream! { loop { yield listener.accept().await; } }),
109                    event_tx,
110                    notification_tx,
111                    stop,
112                    options,
113                    permissions,
114                )
115                .await
116            }),
117        };
118
119        self.tasks.push(task);
120
121        Ok(())
122    }
123
124    /// Stop the listeners and wait for their shutdown
125    pub(super) async fn shutdown(self) -> Result<()> {
126        self.stop.cancel();
127        join_all(self.tasks).await;
128        Ok(())
129    }
130
131    #[allow(clippy::too_many_arguments)]
132    pub(super) async fn connection<T: AsyncRead + AsyncWrite + Unpin>(
133        stream: T,
134        peer: Peer,
135        stop: CancellationToken,
136        container: Option<Container>,
137        options: Options,
138        permissions: Permissions,
139        event_tx: EventTx,
140        mut notification_rx: broadcast::Receiver<(Container, ContainerEvent)>,
141        timeout: Option<time::Duration>,
142    ) -> Result<()> {
143        if let Some(container) = &container {
144            debug!(
145                "Container {} connected with permissions {}",
146                container, permissions
147            );
148        } else {
149            debug!("Client {} connected with permissions {}", peer, permissions);
150        }
151
152        // Get a framed stream and sink interface.
153        let max_request_size = options.max_request_size;
154        let stream = api::codec::framed_with_max_length(stream, max_request_size.try_into()?);
155
156        // Limit requests per second
157        let max_requests_per_sec = options.max_requests_per_sec;
158        const SECOND: time::Duration = time::Duration::from_secs(1);
159        let mut stream = throttle::Throttle::new(stream, max_requests_per_sec, SECOND);
160
161        // Wait for a connect message within timeout
162        let connect = stream.next();
163        // TODO: This can for sure be done nicer
164        let timeout = timeout.unwrap_or_else(|| time::Duration::from_secs(u64::MAX));
165        let connect = time::timeout(timeout, connect);
166        let (protocol_version, notifications) = match connect.await {
167            Ok(Some(Ok(m))) => match m {
168                model::Message::Connect {
169                    connect:
170                        model::Connect {
171                            version,
172                            subscribe_notifications,
173                        },
174                } => (version, subscribe_notifications),
175                _ => {
176                    warn!("{}: Received {:?} instead of Connect", peer, m);
177                    return Ok(());
178                }
179            },
180            Ok(Some(Err(e))) => {
181                warn!("{}: Connection error: {}", peer, e);
182                return Ok(());
183            }
184            Ok(None) => {
185                info!("{}: Connection closed before connect", peer);
186                return Ok(());
187            }
188            Err(_) => {
189                info!("{}: Connection timed out", peer);
190                return Ok(());
191            }
192        };
193
194        // Check protocol version from connect message against local model version
195        let version_request = semver::VersionReq {
196            comparators: vec![Comparator {
197                op: semver::Op::GreaterEq,
198                major: VERSION.major,
199                minor: Some(VERSION.minor),
200                patch: None,
201                pre: semver::Prerelease::default(),
202            }],
203        };
204        let protocol_version = &protocol_version;
205        if !version_request.matches(&(protocol_version.into())) {
206            warn!(
207                "{}: Client connected with insufficent protocol version {}. Expected {}. Disconnecting...",
208                peer, protocol_version, VERSION
209            );
210            // Send a ConnectNack and return -> closes the connection
211            let connect_nack = model::ConnectNack::InvalidProtocolVersion { version: VERSION };
212            let message = model::Message::ConnectNack { connect_nack };
213            stream.send(message).await.ok();
214            return Ok(());
215        }
216
217        // Check notification permission if the client want's to subscribe to
218        // notifications
219        if notifications && !permissions.contains(&Permission::Notifications) {
220            warn!(
221                "{}: Requested notifications without notification permission. Disconnecting...",
222                peer
223            );
224            // Send a ConnectNack and return -> closes the connection
225            let connect_nack = model::ConnectNack::PermissionDenied;
226            let message = model::Message::ConnectNack { connect_nack };
227            stream.send(message).await.ok();
228            return Ok(());
229        }
230
231        // Looks good - send ConnectAck
232        let message = model::Message::ConnectAck {
233            connect_ack: model::ConnectAck,
234        };
235        if let Err(e) = stream.send(message).await {
236            warn!("{}: Connection error: {}", peer, e);
237            return Ok(());
238        }
239
240        // Notification input: If the client subscribe create a stream from the broadcast
241        // receiver and otherwise drop it
242        let notifications = if notifications {
243            debug!("Client {} subscribed to notifications", peer);
244            let stream = stream! { loop { yield notification_rx.recv().await; } };
245            Either::Left(stream)
246        } else {
247            drop(notification_rx);
248            Either::Right(stream::pending())
249        };
250        pin!(notifications);
251
252        loop {
253            select! {
254                _ = stop.cancelled() => {
255                    info!("{}: Closing connection", peer);
256                    break;
257                }
258                notification = notifications.next() => {
259                    // Process notifications received via the notification
260                    // broadcast channel
261                    let notification = match notification {
262                        Some(Ok((container, event))) => (container, event).into(),
263                        Some(Err(broadcast::error::RecvError::Closed)) => break,
264                        Some(Err(broadcast::error::RecvError::Lagged(_))) => {
265                            warn!("Client connection lagged notifications. Closing");
266                            break;
267                        }
268                        None => break,
269                    };
270
271                    if let Err(e) = stream
272                        .send(api::model::Message::Notification {notification })
273                        .await
274                    {
275                        warn!("{}: Connection error: {}", peer, e);
276                        break;
277                    }
278                }
279                item = stream.next() => {
280                    match item {
281                        Some(Ok(model::Message::Request { request })) => {
282                            trace!("{}: --> {:?}", peer, request);
283                            let response = match process_request(&peer, &mut stream, &stop, &options, &permissions, &event_tx, request).await {
284                                Ok(response) => response,
285                                Err(e) => {
286                                    warn!("Failed to process request: {}", e);
287                                    break;
288                                }
289                            };
290                            trace!("{}: <-- {:?}", peer, response);
291
292                            if let Err(e) = stream.send(response).await {
293                                warn!("{}: Connection error: {}", peer, e);
294                                break;
295                            }
296                        }
297                        Some(Ok(message)) => {
298                            warn!("{}: Unexpected message: {:?}. Disconnecting...", peer, message);
299                            break;
300                        }
301                        Some(Err(e)) => {
302                            warn!("{}: Connection error: {:?}. Disconnecting...", peer, e);
303                            break;
304                        }
305                        None => break,
306                    }
307                }
308            }
309        }
310
311        info!("{}: Connection closed", peer);
312
313        Ok(())
314    }
315}
316
317/// Process a request
318///
319/// # Errors
320///
321/// If the streamed NPK is not valid and parsable a `Error::Npk(..)` is returned.
322/// If the event loop is closed due to shutdown, this function will return `Error::EventLoopClosed`.
323///
324async fn process_request<S>(
325    peer: &Peer,
326    stream: &mut Framed<S>,
327    stop: &CancellationToken,
328    options: &Options,
329    permissions: &Permissions,
330    event_loop: &EventTx,
331    request: model::Request,
332) -> Result<model::Message>
333where
334    S: AsyncRead + Unpin,
335{
336    let required_permission = match &request {
337        model::Request::Ident { .. } => Permission::Ident,
338        model::Request::Inspect { .. } => Permission::Inspect,
339        model::Request::Install { .. } => Permission::Install,
340        model::Request::Kill { .. } => Permission::Kill,
341        model::Request::List => Permission::List,
342        model::Request::Mount { .. } => Permission::Mount,
343        model::Request::Repositories => Permission::Repositories,
344        model::Request::Shutdown => Permission::Shutdown,
345        model::Request::Start {
346            init,
347            arguments,
348            environment,
349            ..
350        } if init.is_none() && arguments.is_empty() && environment.is_empty() => Permission::Start,
351        model::Request::Start { .. } => Permission::StartCommand,
352        model::Request::TokenCreate { .. } => Permission::TokenCreate,
353        model::Request::TokenVerify { .. } => Permission::TokenVerification,
354        model::Request::Umount { .. } => Permission::Umount,
355        model::Request::Uninstall { .. } => Permission::Uninstall,
356    };
357
358    if !permissions.contains(&required_permission) {
359        return Ok(model::Message::Response {
360            response: model::Response::PermissionDenied(request),
361        });
362    }
363
364    let (reply_tx, reply_rx) = oneshot::channel();
365    match request {
366        model::Request::Ident => {
367            let ident = match peer {
368                #[allow(clippy::unwrap_used)]
369                Peer::Extern(_) => Container::try_from("extern:0.0.0").unwrap(),
370                Peer::Container(container) => container.clone(),
371            };
372            let response = api::model::Response::Ident(ident);
373            reply_tx.send(response).ok();
374        }
375        model::Request::Install {
376            repository,
377            mut size,
378        } => {
379            debug!(
380                "{}: Received installation request with size {}",
381                peer,
382                bytesize::ByteSize::b(size)
383            );
384
385            // Check the installation request size
386            let max_install_stream_size = options.max_npk_install_size;
387            if size > max_install_stream_size {
388                bail!("npk size too large");
389            }
390
391            info!("{}: Using repository \"{}\"", peer, repository);
392
393            // Send a Receiver<Bytes> to the runtime and forward n bytes to this channel
394            let (tx, rx) = mpsc::channel(10);
395            let request = Request::Install(repository, rx);
396            trace!("    {:?} -> event loop", request);
397            let event = Event::Console(request, reply_tx);
398            event_loop.send(event).await?;
399
400            // The codec might have pulled bytes in the the read buffer of the connection.
401            if !stream.read_buffer().is_empty() {
402                let available = stream.read_buffer().len();
403                // Limit the first read operation to `size` if there's more data available.
404                // If `size` bytes are available, `size` is decremented to 0 and the following
405                // while let loop breaks.
406                let read_max = min(size as usize, available);
407                let buffer = stream.read_buffer_mut().copy_to_bytes(read_max);
408                size -= buffer.len() as u64;
409                tx.send(buffer).await.ok();
410            }
411
412            // If the connections breaks: just break. If the receiver is dropped: just break.
413            let mut take = ReaderStream::with_capacity(stream.get_mut().take(size), 1024 * 1024);
414            let timeout = options.npk_stream_timeout;
415            while let Some(buf) = time::timeout(timeout, take.next())
416                .await
417                .context("npk stream timeout")?
418            {
419                let buf = buf.context("npk steam")?;
420                // Ignore any sending error because the stream needs to be drained for `size` bytes.
421                tx.send(buf).await.ok();
422            }
423        }
424        model::Request::TokenCreate { target, shared } => {
425            let user = match peer {
426                Peer::Extern(_) => "extern",
427                Peer::Container(container) => container.name().as_ref(),
428            };
429            info!(
430                "Creating token for user \"{}\" and target \"{}\" with shared \"{}\"",
431                user,
432                target,
433                hex::encode(&shared)
434            );
435            let token_validity = options.token_validity;
436            let token: Vec<u8> =
437                Token::new(token_validity, user, target.as_ref().as_bytes(), shared).into();
438            let token = api::model::Token::from(token);
439            let response = api::model::Response::Token(token);
440            reply_tx.send(response).ok();
441        }
442        model::Request::TokenVerify {
443            token,
444            user,
445            shared,
446        } => {
447            // The target is the container name, this connection belongs to.
448            let target = match peer {
449                Peer::Extern(_) => "extern",
450                Peer::Container(container) => container.name().as_ref(),
451            };
452            info!(
453                "Verifiying token for user \"{}\" and target \"{}\" with shared \"{}\"",
454                user,
455                target,
456                hex::encode(&shared)
457            );
458            // The token has a valid length - verified by serde::deserialize
459            let token_validity = options.token_validity;
460            let token = Token::from((token_validity, token.as_ref().to_vec()));
461            let result = token
462                .verify(user.as_ref().as_bytes(), target, &shared)
463                .into();
464            let response = api::model::Response::TokenVerification(result);
465            reply_tx.send(response).ok();
466        }
467        request => {
468            let message = Request::Request(request);
469            trace!("    {:?} -> event loop", message);
470            let event = Event::Console(message, reply_tx);
471            event_loop.send(event).await?;
472        }
473    }
474
475    (select! {
476        reply = reply_rx => reply.context("failed to receive reply"),
477        _ = stop.cancelled() => bail!("shutdown"), // There can be a shutdown while we're waiting for an reply
478    })
479    .map(|response| {
480        trace!("    {:?} <- event loop", response);
481        response
482    })
483    .map(|response| model::Message::Response { response })
484}
485
486/// Function to handle connections
487///
488/// Generic handling of connections. The first parameter is a function that when called awaits for
489/// a new connection. The connections are represented as a pair of a stream and some client
490/// identifier.
491///
492/// All the connections container stored the tasks corresponding to each active connection. As
493/// these tasks terminate, they are removed from the connections container. Once a stop is issued,
494/// the termination of the remaining connections will be awaited.
495///
496async fn serve<C, S, A>(
497    connections_stream: C,
498    event_tx: EventTx,
499    notification_tx: NotificationTx,
500    stop: CancellationToken,
501    options: Options,
502    permissions: Permissions,
503) where
504    C: Stream<Item = Result<(S, A), io::Error>> + Unpin,
505    S: AsyncWrite + AsyncRead + Unpin + Send + 'static,
506    A: Into<Peer>,
507{
508    let mut connections_stream = Box::pin(connections_stream);
509    let mut connections = FuturesUnordered::new();
510    loop {
511        select! {
512            _ = connections.next(), if !connections.is_empty() => (), // Removes closed connections
513            // If event_tx is closed then the runtime is shutting down therefore no new connections
514            // are accepted.
515            connection = connections_stream.next(), if !event_tx.is_closed() && !stop.is_cancelled() => {
516                match connection {
517                    Some(Ok((stream, client))) => {
518                        connections.push(
519                        task::spawn(Console::connection(
520                            stream,
521                            client.into(),
522                            stop.clone(),
523                            None,
524                            options.clone(),
525                            permissions.clone(),
526                            event_tx.clone(),
527                            notification_tx.subscribe(),
528                            Some(time::Duration::from_secs(10)),
529                        )));
530                    }
531                    Some(Err(e)) => {
532                        warn!("Error listening: {:?}", e);
533                        break;
534                    }
535                    None => break,
536                }
537            }
538            _ = stop.cancelled() => {
539                if !connections.is_empty() {
540                    debug!("Waiting for open connections");
541                    while connections.next().await.is_some() {};
542                }
543                break;
544            }
545        }
546    }
547    debug!("Closed listener");
548}
549
550pub enum Peer {
551    Extern(Url),
552    Container(Container),
553}
554
555impl From<std::net::SocketAddr> for Peer {
556    fn from(socket: std::net::SocketAddr) -> Self {
557        match socket.ip() {
558            std::net::IpAddr::V4(ip) => Url::parse(&format!("tcp://{}:{}", ip, socket.port()))
559                .map(Peer::Extern)
560                .expect("internal error"),
561            std::net::IpAddr::V6(ip) => Url::parse(&format!("tcp://[{}]:{}", ip, socket.port()))
562                .map(Peer::Extern)
563                .expect("internal error"),
564        }
565    }
566}
567
568impl From<tokio::net::unix::SocketAddr> for Peer {
569    fn from(socket: tokio::net::unix::SocketAddr) -> Self {
570        let path = socket
571            .as_pathname()
572            .unwrap_or_else(|| Path::new("unnamed"))
573            .display();
574        Url::parse(&format!("unix://{path}"))
575            .map(Peer::Extern)
576            .expect("invalid url")
577    }
578}
579
580impl fmt::Display for Peer {
581    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
582        match self {
583            Peer::Extern(url) => write!(f, "Remote({url})"),
584            Peer::Container(container) => write!(f, "Container({container})"),
585        }
586    }
587}
588
589impl From<ExitStatus> for model::ExitStatus {
590    fn from(e: ExitStatus) -> Self {
591        match e {
592            ExitStatus::Exit(code) => api::model::ExitStatus::Exit { code },
593            ExitStatus::Signalled(signal) => api::model::ExitStatus::Signalled {
594                signal: signal as u32,
595            },
596        }
597    }
598}
599
600impl From<(Container, ContainerEvent)> for model::Notification {
601    fn from(p: (Container, ContainerEvent)) -> model::Notification {
602        let container = p.0.clone();
603        match p.1 {
604            ContainerEvent::Started => api::model::Notification::Started(container),
605            ContainerEvent::Exit(status) => {
606                api::model::Notification::Exit(container, status.into())
607            }
608            ContainerEvent::Installed => api::model::Notification::Install(container),
609            ContainerEvent::Uninstalled => api::model::Notification::Uninstall(container),
610            ContainerEvent::CGroup(event) => match event {
611                CGroupEvent::Memory(memory) => api::model::Notification::CGroup(
612                    container,
613                    api::model::CgroupNotification::Memory(api::model::MemoryNotification {
614                        low: memory.low,
615                        high: memory.high,
616                        max: memory.max,
617                        oom: memory.oom,
618                        oom_kill: memory.oom_kill,
619                    }),
620                ),
621            },
622        }
623    }
624}