use std::{
collections::{HashMap, HashSet, VecDeque},
convert::TryInto,
env,
iter::empty,
os::unix::prelude::FromRawFd,
path::Path,
pin::Pin,
task::Poll,
};
use error::Error;
use futures::{SinkExt, Stream, StreamExt};
use northstar_runtime::{
api::{
codec,
model::{
ConnectNack, Container, ContainerData, InspectResult, InstallResult, Message,
MountResult, Notification, RepositoryId, Request, Response, Token, UmountResult,
VerificationResult,
},
},
common::non_nul_string::NonNulString,
};
use tokio::{
fs,
io::{self, AsyncRead, AsyncWrite, BufWriter},
};
pub mod error;
pub use northstar_runtime::{
api::{model, VERSION},
common::name::Name,
};
const BUFFER_SIZE: usize = 1024 * 1024;
pub struct Client<T> {
connection: codec::Framed<T>,
notifications: Option<VecDeque<Notification>>,
}
pub type Connection<T> = codec::Framed<T>;
pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
io: T,
subscribe_notifications: bool,
) -> Result<Connection<T>, Error> {
let mut connection = codec::framed(io);
connection
.send(Message::Connect {
connect: model::Connect {
version: VERSION,
subscribe_notifications,
},
})
.await?;
let message = connection.next().await.ok_or(Error::ConnectionClosed)??;
match message {
Message::ConnectAck { .. } => Ok(connection),
Message::ConnectNack { connect_nack } => match connect_nack {
ConnectNack::InvalidProtocolVersion { .. } => Err(Error::ProtocolVersion),
ConnectNack::PermissionDenied => Err(Error::PermissionDenied),
},
_ => unreachable!("expecting connect ack or connect nack"),
}
}
impl Client<tokio::net::UnixStream> {
pub async fn from_env(notifications: Option<usize>) -> Result<Self, Error> {
let fd = env::var("NORTHSTAR_CONSOLE")
.map_err(|_| io::Error::new(io::ErrorKind::Other, "missing env variable"))?
.parse::<i32>()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid env variable"))?;
let std = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd) };
std.set_nonblocking(true)?;
let io = tokio::net::UnixStream::from_std(std)?;
let client = Client::new(io, notifications).await?;
Ok(client)
}
}
impl<'a, T: AsyncRead + AsyncWrite + Unpin> Client<T> {
pub async fn new(io: T, notifications: Option<usize>) -> Result<Client<T>, Error> {
let connection = connect(io, notifications.is_some()).await?;
Ok(Client {
connection,
notifications: notifications.map(VecDeque::with_capacity),
})
}
pub fn framed(self) -> Connection<T> {
self.connection
}
pub async fn request(&mut self, request: Request) -> Result<Response, Error> {
let message = Message::Request { request };
self.connection.send(message).await?;
loop {
let message = self
.connection
.next()
.await
.ok_or(Error::ConnectionClosed)??;
match message {
Message::Response { response } => break Ok(response),
Message::Notification { notification } => self.push_notification(notification)?,
_ => unreachable!("invalid message {:?}", message),
}
}
}
pub async fn ident(&mut self) -> Result<Container, Error> {
match self.request(Request::Ident).await? {
Response::Ident(container) => Ok(container),
Response::PermissionDenied(_) => Err(Error::PermissionDenied),
_ => unreachable!("response on ident should be ident"),
}
}
pub async fn list(&mut self) -> Result<Vec<Container>, Error> {
match self.request(Request::List).await? {
Response::List(containers) => Ok(containers),
Response::PermissionDenied(_) => Err(Error::PermissionDenied),
_ => unreachable!("response on containers should be containers"),
}
}
pub async fn repositories(&mut self) -> Result<HashSet<RepositoryId>, Error> {
match self.request(Request::Repositories).await? {
Response::Repositories(repositories) => Ok(repositories),
Response::PermissionDenied(_) => Err(Error::PermissionDenied),
_ => unreachable!("response on repositories should be ok or error"),
}
}
pub async fn start<C>(&mut self, container: C) -> Result<(), Error>
where
C: TryInto<Container>,
C::Error: std::error::Error + Send + Sync + 'static,
{
self.start_command(
container,
Option::<&str>::None,
empty::<&str>(),
empty::<(&str, &str)>(),
)
.await
}
pub async fn start_command<C, A, E, K>(
&mut self,
container: C,
init: Option<A>,
args: impl IntoIterator<Item = A>,
env: impl IntoIterator<Item = (E, K)>,
) -> Result<(), Error>
where
C: TryInto<Container>,
C::Error: std::error::Error + Send + Sync + 'static,
A: TryInto<NonNulString>,
A::Error: std::error::Error + Send + Sync + 'static,
E: TryInto<NonNulString>,
E::Error: std::error::Error + Send + Sync + 'static,
K: TryInto<NonNulString>,
K::Error: std::error::Error + Send + Sync + 'static,
{
let container = container
.try_into()
.map_err(|e| Error::InvalidArgument(e.to_string()))?;
let init = if let Some(init) = init {
Some(
init.try_into()
.map_err(|e| Error::InvalidArgument(format!("invalid init: {e}")))?,
)
} else {
None
};
let mut args_converted = vec![];
for arg in args {
args_converted.push(
arg.try_into()
.map_err(|e| Error::InvalidArgument(format!("invalid argument: {e}")))?,
);
}
let mut env_converted = HashMap::new();
for (key, value) in env {
let key = key
.try_into()
.map_err(|e| Error::InvalidArgument(format!("invalid argument: {e}")))?;
let value = value
.try_into()
.map_err(|e| Error::InvalidArgument(format!("invalid argument: {e}")))?;
env_converted.insert(key, value);
}
let arguments = args_converted;
let environment = env_converted;
let request = Request::Start {
container,
init,
arguments,
environment,
};
match self.request(request).await? {
Response::Start(model::StartResult::Ok { .. }) => Ok(()),
Response::Start(model::StartResult::Error { error, .. }) => Err(Error::Runtime(error)),
Response::PermissionDenied(_) => Err(Error::PermissionDenied),
_ => unreachable!("response on start should be ok or error"),
}
}
pub async fn kill<C>(&mut self, container: C, signal: i32) -> Result<(), Error>
where
C: TryInto<Container>,
C::Error: std::error::Error + Send + Sync + 'static,
{
let container = container
.try_into()
.map_err(|e| Error::InvalidArgument(e.to_string()))?;
match self.request(Request::Kill { container, signal }).await? {
Response::Kill(model::KillResult::Ok { .. }) => Ok(()),
Response::Kill(model::KillResult::Error { error, .. }) => Err(Error::Runtime(error)),
Response::PermissionDenied(_) => Err(Error::PermissionDenied),
_ => unreachable!("response on kill should be ok or error"),
}
}
pub async fn install_file(&mut self, npk: &Path, repository: &str) -> Result<Container, Error> {
let file = fs::File::open(npk).await?;
let size = file.metadata().await?.len();
self.install(file, size, repository).await
}
pub async fn install(
&mut self,
npk: impl AsyncRead + Unpin,
size: u64,
repository: &str,
) -> Result<Container, Error> {
let request = Request::Install {
repository: repository.into(),
size,
};
let message = Message::Request { request };
self.connection.send(message).await?;
self.connection.flush().await?;
debug_assert!(self.connection.write_buffer().is_empty());
let mut reader = io::BufReader::with_capacity(BUFFER_SIZE, npk);
let mut writer = BufWriter::with_capacity(BUFFER_SIZE, self.connection.get_mut());
io::copy_buf(&mut reader, &mut writer).await?;
loop {
let message = self
.connection
.next()
.await
.ok_or(Error::ConnectionClosed)??;
match message {
Message::Response { response } => match response {
Response::Install(InstallResult::Ok { container }) => break Ok(container),
Response::Install(InstallResult::Error { error }) => {
break Err(Error::Runtime(error))
}
Response::PermissionDenied(_) => break Err(Error::PermissionDenied),
_ => unreachable!("response on install should be container or error"),
},
Message::Notification { notification } => self.push_notification(notification)?,
_ => unreachable!("invalid response"),
}
}
}
pub async fn uninstall<C>(&mut self, container: C, wipe: bool) -> Result<(), Error>
where
C: TryInto<Container>,
C::Error: std::error::Error + Send + Sync + 'static,
{
let container = container
.try_into()
.map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
match self.request(Request::Uninstall { container, wipe }).await? {
Response::Uninstall(model::UninstallResult::Ok { .. }) => Ok(()),
Response::Uninstall(model::UninstallResult::Error { error, .. }) => {
Err(Error::Runtime(error))
}
Response::PermissionDenied(_) => Err(Error::PermissionDenied),
_ => unreachable!("response on uninstall should be ok or error"),
}
}
pub async fn shutdown(&mut self) {
self.request(Request::Shutdown).await.ok();
}
pub async fn mount<C>(&mut self, container: C) -> Result<MountResult, Error>
where
C: TryInto<Container>,
C::Error: std::error::Error + Send + Sync + 'static,
{
self.mount_all([container])
.await
.map(|mut r| r.pop().expect("invalid mount result"))
}
pub async fn mount_all<C, I>(&mut self, containers: I) -> Result<Vec<MountResult>, Error>
where
C: TryInto<Container>,
C::Error: std::error::Error + Send + Sync + 'static,
I: 'a + IntoIterator<Item = C>,
{
let mut result = vec![];
for container in containers.into_iter() {
let container = container
.try_into()
.map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
result.push(container);
}
match self.request(Request::Mount { containers: result }).await? {
Response::Mount(result) => Ok(result),
Response::PermissionDenied(_) => Err(Error::PermissionDenied),
_ => unreachable!("response on umount_all should be mount"),
}
}
pub async fn umount<C>(&mut self, container: C) -> Result<UmountResult, Error>
where
C: TryInto<Container>,
C::Error: std::error::Error + Send + Sync + 'static,
{
self.umount_all([container])
.await
.map(|mut r| r.pop().expect("invalid mount result"))
}
pub async fn umount_all<C, I>(&mut self, containers: I) -> Result<Vec<UmountResult>, Error>
where
C: TryInto<Container>,
C::Error: std::error::Error + Send + Sync + 'static,
I: 'a + IntoIterator<Item = C>,
{
let containers = containers.into_iter();
let mut result = Vec::with_capacity(containers.size_hint().0);
for container in containers {
let container = container
.try_into()
.map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
result.push(container);
}
match self.request(Request::Umount { containers: result }).await? {
Response::Umount(result) => Ok(result),
Response::PermissionDenied(_) => Err(Error::PermissionDenied),
_ => unreachable!("response on umount should be umount"),
}
}
pub async fn inspect<C>(&mut self, container: C) -> Result<ContainerData, Error>
where
C: TryInto<Container>,
C::Error: std::error::Error + Send + Sync + 'static,
{
let container = container
.try_into()
.map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
match self.request(Request::Inspect { container }).await? {
Response::Inspect(InspectResult::Ok { container: _, data }) => Ok(*data),
Response::Inspect(InspectResult::Error {
container: _,
error,
}) => Err(Error::Runtime(error)),
Response::PermissionDenied(_) => Err(Error::PermissionDenied),
_ => unreachable!("response on container_stats should be a container_stats"),
}
}
pub async fn create_token<R, S>(&mut self, target: R, shared: S) -> Result<Token, Error>
where
R: TryInto<Name>,
R::Error: std::error::Error + Send + Sync + 'static,
S: AsRef<[u8]>,
{
let target = target
.try_into()
.map_err(|e| Error::InvalidArgument(format!("invalid target container name: {e}")))?;
let shared = shared.as_ref().to_vec();
match self
.request(Request::TokenCreate { target, shared })
.await?
{
Response::Token(token) => Ok(token),
Response::PermissionDenied(_) => Err(Error::PermissionDenied),
_ => unreachable!("response on token should be a token reponse created"),
}
}
pub async fn verify_token<R, S>(
&mut self,
token: &Token,
user: R,
shared: S,
) -> Result<VerificationResult, Error>
where
R: TryInto<Name>,
R::Error: std::error::Error + Send + Sync + 'static,
S: AsRef<[u8]>,
{
let token = token.clone();
let shared = shared.as_ref().to_vec();
let user = user
.try_into()
.map_err(|e| Error::InvalidArgument(format!("invalid user container name: {e}")))?;
match self
.request(Request::TokenVerify {
token,
user,
shared,
})
.await?
{
Response::TokenVerification(result) => Ok(result),
Response::PermissionDenied(_) => Err(Error::PermissionDenied),
_ => unreachable!("response on token verification should be a token verification"),
}
}
fn push_notification(&mut self, notification: Notification) -> Result<(), Error> {
if let Some(notifications) = &mut self.notifications {
if notifications.len() == notifications.capacity() {
Err(Error::LaggedNotifications)
} else {
notifications.push_back(notification);
Ok(())
}
} else {
Ok(())
}
}
}
impl<T: AsyncRead + AsyncWrite + Unpin> Stream for Client<T> {
type Item = Result<Notification, io::Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(n) = self.notifications.as_mut().and_then(|n| n.pop_front()) {
Poll::Ready(Some(Ok(n)))
} else {
match self.connection.poll_next_unpin(cx) {
Poll::Ready(r) => match r {
Some(Ok(message)) => match message {
Message::Notification { notification } => {
Poll::Ready(Some(Ok(notification)))
}
_ => unreachable!(),
},
Some(Err(e)) => Poll::Ready(Some(Err(e))),
None => Poll::Ready(None),
},
Poll::Pending => Poll::Pending,
}
}
}
}