1use crate::{
2 api::{self, codec::Framed, VERSION},
3 common::container::Container,
4 runtime::{
5 events::{CGroupEvent, ContainerEvent, Event, EventTx},
6 exit_status::ExitStatus,
7 repository::RepositoryId,
8 runtime::NotificationTx,
9 token::Token,
10 },
11};
12use anyhow::{bail, Context, Result};
13use api::model;
14use async_stream::stream;
15use bytes::{Buf, Bytes};
16use futures::{
17 future::join_all,
18 stream::{self, FuturesUnordered},
19 Stream, StreamExt,
20};
21use listener::Listener;
22use log::{debug, info, trace, warn};
23use semver::Comparator;
24use std::{cmp::min, fmt, path::Path, unreachable};
25use tokio::{
26 io::{self, AsyncRead, AsyncReadExt, AsyncWrite},
27 pin, select,
28 sync::{broadcast, mpsc, oneshot},
29 task, time,
30};
31use tokio_util::{either::Either, io::ReaderStream, sync::CancellationToken};
32use url::Url;
33
34pub use options::Options;
35use permissions::Permission;
36pub use permissions::Permissions;
37
38mod listener;
39mod options;
40mod permissions;
41mod throttle;
42
43#[derive(Debug)]
45pub(crate) enum Request {
46 Request(model::Request),
47 Install(RepositoryId, mpsc::Receiver<Bytes>),
48}
49
50pub(crate) struct Console {
54 event_tx: EventTx,
56 notification_tx: NotificationTx,
58 stop: CancellationToken,
60 tasks: Vec<task::JoinHandle<()>>,
63}
64
65impl Console {
66 pub(super) fn new(event_tx: EventTx, notification_tx: NotificationTx) -> Console {
68 Self {
69 event_tx,
70 notification_tx,
71 stop: CancellationToken::new(),
72 tasks: Vec::new(),
73 }
74 }
75
76 pub(super) async fn listen(
79 &mut self,
80 url: &Url,
81 options: Options,
82 permissions: Permissions,
83 ) -> Result<()> {
84 let event_tx = self.event_tx.clone();
85 let notification_tx = self.notification_tx.clone();
86 let stop = self.stop.clone();
88
89 debug!("Starting console on {url} with permissions \"{permissions}\"",);
90 let listener = Listener::new(url)
91 .await
92 .context("failed to start console listener")?;
93
94 let task = match listener {
95 Listener::Tcp(listener) => task::spawn(async move {
96 serve(
97 Box::pin(stream! { loop { yield listener.accept().await; } }),
98 event_tx,
99 notification_tx,
100 stop,
101 options,
102 permissions,
103 )
104 .await
105 }),
106 Listener::Unix(listener) => task::spawn(async move {
107 serve(
108 Box::pin(stream! { loop { yield listener.accept().await; } }),
109 event_tx,
110 notification_tx,
111 stop,
112 options,
113 permissions,
114 )
115 .await
116 }),
117 };
118
119 self.tasks.push(task);
120
121 Ok(())
122 }
123
124 pub(super) async fn shutdown(self) -> Result<()> {
126 self.stop.cancel();
127 join_all(self.tasks).await;
128 Ok(())
129 }
130
131 #[allow(clippy::too_many_arguments)]
132 pub(super) async fn connection<T: AsyncRead + AsyncWrite + Unpin>(
133 stream: T,
134 peer: Peer,
135 stop: CancellationToken,
136 container: Option<Container>,
137 options: Options,
138 permissions: Permissions,
139 event_tx: EventTx,
140 mut notification_rx: broadcast::Receiver<(Container, ContainerEvent)>,
141 timeout: Option<time::Duration>,
142 ) -> Result<()> {
143 if let Some(container) = &container {
144 debug!(
145 "Container {} connected with permissions {}",
146 container, permissions
147 );
148 } else {
149 debug!("Client {} connected with permissions {}", peer, permissions);
150 }
151
152 let max_request_size = options.max_request_size;
154 let stream = api::codec::framed_with_max_length(stream, max_request_size.try_into()?);
155
156 let max_requests_per_sec = options.max_requests_per_sec;
158 const SECOND: time::Duration = time::Duration::from_secs(1);
159 let mut stream = throttle::Throttle::new(stream, max_requests_per_sec, SECOND);
160
161 let connect = stream.next();
163 let timeout = timeout.unwrap_or_else(|| time::Duration::from_secs(u64::MAX));
165 let connect = time::timeout(timeout, connect);
166 let (protocol_version, notifications) = match connect.await {
167 Ok(Some(Ok(m))) => match m {
168 model::Message::Connect {
169 connect:
170 model::Connect {
171 version,
172 subscribe_notifications,
173 },
174 } => (version, subscribe_notifications),
175 _ => {
176 warn!("{}: Received {:?} instead of Connect", peer, m);
177 return Ok(());
178 }
179 },
180 Ok(Some(Err(e))) => {
181 warn!("{}: Connection error: {}", peer, e);
182 return Ok(());
183 }
184 Ok(None) => {
185 info!("{}: Connection closed before connect", peer);
186 return Ok(());
187 }
188 Err(_) => {
189 info!("{}: Connection timed out", peer);
190 return Ok(());
191 }
192 };
193
194 let version_request = semver::VersionReq {
196 comparators: vec![Comparator {
197 op: semver::Op::GreaterEq,
198 major: VERSION.major,
199 minor: Some(VERSION.minor),
200 patch: None,
201 pre: semver::Prerelease::default(),
202 }],
203 };
204 let protocol_version = &protocol_version;
205 if !version_request.matches(&(protocol_version.into())) {
206 warn!(
207 "{}: Client connected with insufficent protocol version {}. Expected {}. Disconnecting...",
208 peer, protocol_version, VERSION
209 );
210 let connect_nack = model::ConnectNack::InvalidProtocolVersion { version: VERSION };
212 let message = model::Message::ConnectNack { connect_nack };
213 stream.send(message).await.ok();
214 return Ok(());
215 }
216
217 if notifications && !permissions.contains(&Permission::Notifications) {
220 warn!(
221 "{}: Requested notifications without notification permission. Disconnecting...",
222 peer
223 );
224 let connect_nack = model::ConnectNack::PermissionDenied;
226 let message = model::Message::ConnectNack { connect_nack };
227 stream.send(message).await.ok();
228 return Ok(());
229 }
230
231 let message = model::Message::ConnectAck {
233 connect_ack: model::ConnectAck,
234 };
235 if let Err(e) = stream.send(message).await {
236 warn!("{}: Connection error: {}", peer, e);
237 return Ok(());
238 }
239
240 let notifications = if notifications {
243 debug!("Client {} subscribed to notifications", peer);
244 let stream = stream! { loop { yield notification_rx.recv().await; } };
245 Either::Left(stream)
246 } else {
247 drop(notification_rx);
248 Either::Right(stream::pending())
249 };
250 pin!(notifications);
251
252 loop {
253 select! {
254 _ = stop.cancelled() => {
255 info!("{}: Closing connection", peer);
256 break;
257 }
258 notification = notifications.next() => {
259 let notification = match notification {
262 Some(Ok((container, event))) => (container, event).into(),
263 Some(Err(broadcast::error::RecvError::Closed)) => break,
264 Some(Err(broadcast::error::RecvError::Lagged(_))) => {
265 warn!("Client connection lagged notifications. Closing");
266 break;
267 }
268 None => break,
269 };
270
271 if let Err(e) = stream
272 .send(api::model::Message::Notification {notification })
273 .await
274 {
275 warn!("{}: Connection error: {}", peer, e);
276 break;
277 }
278 }
279 item = stream.next() => {
280 match item {
281 Some(Ok(model::Message::Request { request })) => {
282 trace!("{}: --> {:?}", peer, request);
283 let response = match process_request(&peer, &mut stream, &stop, &options, &permissions, &event_tx, request).await {
284 Ok(response) => response,
285 Err(e) => {
286 warn!("Failed to process request: {}", e);
287 break;
288 }
289 };
290 trace!("{}: <-- {:?}", peer, response);
291
292 if let Err(e) = stream.send(response).await {
293 warn!("{}: Connection error: {}", peer, e);
294 break;
295 }
296 }
297 Some(Ok(message)) => {
298 warn!("{}: Unexpected message: {:?}. Disconnecting...", peer, message);
299 break;
300 }
301 Some(Err(e)) => {
302 warn!("{}: Connection error: {:?}. Disconnecting...", peer, e);
303 break;
304 }
305 None => break,
306 }
307 }
308 }
309 }
310
311 info!("{}: Connection closed", peer);
312
313 Ok(())
314 }
315}
316
317async fn process_request<S>(
325 peer: &Peer,
326 stream: &mut Framed<S>,
327 stop: &CancellationToken,
328 options: &Options,
329 permissions: &Permissions,
330 event_loop: &EventTx,
331 request: model::Request,
332) -> Result<model::Message>
333where
334 S: AsyncRead + Unpin,
335{
336 let required_permission = match &request {
337 model::Request::Ident { .. } => Permission::Ident,
338 model::Request::Inspect { .. } => Permission::Inspect,
339 model::Request::Install { .. } => Permission::Install,
340 model::Request::Kill { .. } => Permission::Kill,
341 model::Request::List => Permission::List,
342 model::Request::Mount { .. } => Permission::Mount,
343 model::Request::Repositories => Permission::Repositories,
344 model::Request::Shutdown => Permission::Shutdown,
345 model::Request::Start {
346 init,
347 arguments,
348 environment,
349 ..
350 } if init.is_none() && arguments.is_empty() && environment.is_empty() => Permission::Start,
351 model::Request::Start { .. } => Permission::StartCommand,
352 model::Request::TokenCreate { .. } => Permission::TokenCreate,
353 model::Request::TokenVerify { .. } => Permission::TokenVerification,
354 model::Request::Umount { .. } => Permission::Umount,
355 model::Request::Uninstall { .. } => Permission::Uninstall,
356 };
357
358 if !permissions.contains(&required_permission) {
359 return Ok(model::Message::Response {
360 response: model::Response::PermissionDenied(request),
361 });
362 }
363
364 let (reply_tx, reply_rx) = oneshot::channel();
365 match request {
366 model::Request::Ident => {
367 let ident = match peer {
368 #[allow(clippy::unwrap_used)]
369 Peer::Extern(_) => Container::try_from("extern:0.0.0").unwrap(),
370 Peer::Container(container) => container.clone(),
371 };
372 let response = api::model::Response::Ident(ident);
373 reply_tx.send(response).ok();
374 }
375 model::Request::Install {
376 repository,
377 mut size,
378 } => {
379 debug!(
380 "{}: Received installation request with size {}",
381 peer,
382 bytesize::ByteSize::b(size)
383 );
384
385 let max_install_stream_size = options.max_npk_install_size;
387 if size > max_install_stream_size {
388 bail!("npk size too large");
389 }
390
391 info!("{}: Using repository \"{}\"", peer, repository);
392
393 let (tx, rx) = mpsc::channel(10);
395 let request = Request::Install(repository, rx);
396 trace!(" {:?} -> event loop", request);
397 let event = Event::Console(request, reply_tx);
398 event_loop.send(event).await?;
399
400 if !stream.read_buffer().is_empty() {
402 let available = stream.read_buffer().len();
403 let read_max = min(size as usize, available);
407 let buffer = stream.read_buffer_mut().copy_to_bytes(read_max);
408 size -= buffer.len() as u64;
409 tx.send(buffer).await.ok();
410 }
411
412 let mut take = ReaderStream::with_capacity(stream.get_mut().take(size), 1024 * 1024);
414 let timeout = options.npk_stream_timeout;
415 while let Some(buf) = time::timeout(timeout, take.next())
416 .await
417 .context("npk stream timeout")?
418 {
419 let buf = buf.context("npk steam")?;
420 tx.send(buf).await.ok();
422 }
423 }
424 model::Request::TokenCreate { target, shared } => {
425 let user = match peer {
426 Peer::Extern(_) => "extern",
427 Peer::Container(container) => container.name().as_ref(),
428 };
429 info!(
430 "Creating token for user \"{}\" and target \"{}\" with shared \"{}\"",
431 user,
432 target,
433 hex::encode(&shared)
434 );
435 let token_validity = options.token_validity;
436 let token: Vec<u8> =
437 Token::new(token_validity, user, target.as_ref().as_bytes(), shared).into();
438 let token = api::model::Token::from(token);
439 let response = api::model::Response::Token(token);
440 reply_tx.send(response).ok();
441 }
442 model::Request::TokenVerify {
443 token,
444 user,
445 shared,
446 } => {
447 let target = match peer {
449 Peer::Extern(_) => "extern",
450 Peer::Container(container) => container.name().as_ref(),
451 };
452 info!(
453 "Verifiying token for user \"{}\" and target \"{}\" with shared \"{}\"",
454 user,
455 target,
456 hex::encode(&shared)
457 );
458 let token_validity = options.token_validity;
460 let token = Token::from((token_validity, token.as_ref().to_vec()));
461 let result = token
462 .verify(user.as_ref().as_bytes(), target, &shared)
463 .into();
464 let response = api::model::Response::TokenVerification(result);
465 reply_tx.send(response).ok();
466 }
467 request => {
468 let message = Request::Request(request);
469 trace!(" {:?} -> event loop", message);
470 let event = Event::Console(message, reply_tx);
471 event_loop.send(event).await?;
472 }
473 }
474
475 (select! {
476 reply = reply_rx => reply.context("failed to receive reply"),
477 _ = stop.cancelled() => bail!("shutdown"), })
479 .map(|response| {
480 trace!(" {:?} <- event loop", response);
481 response
482 })
483 .map(|response| model::Message::Response { response })
484}
485
486async fn serve<C, S, A>(
497 connections_stream: C,
498 event_tx: EventTx,
499 notification_tx: NotificationTx,
500 stop: CancellationToken,
501 options: Options,
502 permissions: Permissions,
503) where
504 C: Stream<Item = Result<(S, A), io::Error>> + Unpin,
505 S: AsyncWrite + AsyncRead + Unpin + Send + 'static,
506 A: Into<Peer>,
507{
508 let mut connections_stream = Box::pin(connections_stream);
509 let mut connections = FuturesUnordered::new();
510 loop {
511 select! {
512 _ = connections.next(), if !connections.is_empty() => (), connection = connections_stream.next(), if !event_tx.is_closed() && !stop.is_cancelled() => {
516 match connection {
517 Some(Ok((stream, client))) => {
518 connections.push(
519 task::spawn(Console::connection(
520 stream,
521 client.into(),
522 stop.clone(),
523 None,
524 options.clone(),
525 permissions.clone(),
526 event_tx.clone(),
527 notification_tx.subscribe(),
528 Some(time::Duration::from_secs(10)),
529 )));
530 }
531 Some(Err(e)) => {
532 warn!("Error listening: {:?}", e);
533 break;
534 }
535 None => break,
536 }
537 }
538 _ = stop.cancelled() => {
539 if !connections.is_empty() {
540 debug!("Waiting for open connections");
541 while connections.next().await.is_some() {};
542 }
543 break;
544 }
545 }
546 }
547 debug!("Closed listener");
548}
549
550pub enum Peer {
551 Extern(Url),
552 Container(Container),
553}
554
555impl From<std::net::SocketAddr> for Peer {
556 fn from(socket: std::net::SocketAddr) -> Self {
557 match socket.ip() {
558 std::net::IpAddr::V4(ip) => Url::parse(&format!("tcp://{}:{}", ip, socket.port()))
559 .map(Peer::Extern)
560 .expect("internal error"),
561 std::net::IpAddr::V6(ip) => Url::parse(&format!("tcp://[{}]:{}", ip, socket.port()))
562 .map(Peer::Extern)
563 .expect("internal error"),
564 }
565 }
566}
567
568impl From<tokio::net::unix::SocketAddr> for Peer {
569 fn from(socket: tokio::net::unix::SocketAddr) -> Self {
570 let path = socket
571 .as_pathname()
572 .unwrap_or_else(|| Path::new("unnamed"))
573 .display();
574 Url::parse(&format!("unix://{path}"))
575 .map(Peer::Extern)
576 .expect("invalid url")
577 }
578}
579
580impl fmt::Display for Peer {
581 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
582 match self {
583 Peer::Extern(url) => write!(f, "Remote({url})"),
584 Peer::Container(container) => write!(f, "Container({container})"),
585 }
586 }
587}
588
589impl From<ExitStatus> for model::ExitStatus {
590 fn from(e: ExitStatus) -> Self {
591 match e {
592 ExitStatus::Exit(code) => api::model::ExitStatus::Exit { code },
593 ExitStatus::Signalled(signal) => api::model::ExitStatus::Signalled {
594 signal: signal as u32,
595 },
596 }
597 }
598}
599
600impl From<(Container, ContainerEvent)> for model::Notification {
601 fn from(p: (Container, ContainerEvent)) -> model::Notification {
602 let container = p.0.clone();
603 match p.1 {
604 ContainerEvent::Started => api::model::Notification::Started(container),
605 ContainerEvent::Exit(status) => {
606 api::model::Notification::Exit(container, status.into())
607 }
608 ContainerEvent::Installed => api::model::Notification::Install(container),
609 ContainerEvent::Uninstalled => api::model::Notification::Uninstall(container),
610 ContainerEvent::CGroup(event) => match event {
611 CGroupEvent::Memory(memory) => api::model::Notification::CGroup(
612 container,
613 api::model::CgroupNotification::Memory(api::model::MemoryNotification {
614 low: memory.low,
615 high: memory.high,
616 max: memory.max,
617 oom: memory.oom,
618 oom_kill: memory.oom_kill,
619 }),
620 ),
621 },
622 }
623 }
624}