northstar_client/
lib.rs

1use std::{
2    collections::{HashMap, HashSet, VecDeque},
3    convert::TryInto,
4    env,
5    iter::empty,
6    os::unix::prelude::FromRawFd,
7    path::Path,
8    pin::Pin,
9    task::Poll,
10};
11
12use error::Error;
13use futures::{SinkExt, Stream, StreamExt};
14use northstar_runtime::{
15    api::{
16        codec,
17        model::{
18            ConnectNack, Container, ContainerData, InspectResult, InstallResult, Message,
19            MountResult, Notification, RepositoryId, Request, Response, Token, UmountResult,
20            VerificationResult,
21        },
22    },
23    common::non_nul_string::NonNulString,
24};
25use tokio::{
26    fs,
27    io::{self, AsyncRead, AsyncWrite, BufWriter},
28};
29
30/// Client errors
31pub mod error;
32pub use northstar_runtime::{
33    api::{model, VERSION},
34    common::name::Name,
35};
36
37/// Default buffer size for installation transfers
38const BUFFER_SIZE: usize = 1024 * 1024;
39
40/// Client for a Northstar runtime instance.
41///
42/// ```no_run
43/// use futures::StreamExt;
44/// use northstar_client::Client;
45/// use northstar_client::model::Version;
46///
47/// # #[tokio::main(flavor = "current_thread")]
48/// async fn main() {
49///     let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
50///     client.start("hello:0.0.1").await.expect("failed to start \"hello\"");
51///     while let Some(notification) = client.next().await {
52///         println!("{:?}", notification);
53///     }
54/// }
55/// ```
56pub struct Client<T> {
57    /// Connection to the runtime
58    connection: codec::Framed<T>,
59    /// Buffer notifications received during request response communication
60    notifications: Option<VecDeque<Notification>>,
61}
62
63/// Northstar console connection
64pub type Connection<T> = codec::Framed<T>;
65
66/// Connect and return a raw stream and sink interface. See codec for details
67///
68/// # Arguments
69///
70/// * `io` - Medium for the connection (e.g. Unix or TCP socket)
71/// * `subscribe_notifications` - Enables the reception of notifications through the connection.
72///
73/// # Errors
74///
75/// An error is returned in the following cases:
76///
77/// - A mismatch in the protocol version between both sides of the connection
78/// - Unnecessary permissions
79/// - OS errors
80///
81pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
82    io: T,
83    subscribe_notifications: bool,
84) -> Result<Connection<T>, Error> {
85    let mut connection = codec::framed(io);
86
87    // Send connect message
88    connection
89        .send(Message::Connect {
90            connect: model::Connect {
91                version: VERSION,
92                subscribe_notifications,
93            },
94        })
95        .await?;
96
97    // Wait for conack
98    let message = connection.next().await.ok_or(Error::ConnectionClosed)??;
99
100    match message {
101        Message::ConnectAck { .. } => Ok(connection),
102        Message::ConnectNack { connect_nack } => match connect_nack {
103            ConnectNack::InvalidProtocolVersion { .. } => Err(Error::ProtocolVersion),
104            ConnectNack::PermissionDenied => Err(Error::PermissionDenied),
105        },
106        _ => unreachable!("expecting connect ack or connect nack"),
107    }
108}
109
110impl Client<tokio::net::UnixStream> {
111    /// Tries to create a client by accessing `NORTHSTAR_CONSOLE` env variable
112    ///
113    /// # Errors
114    ///
115    /// An `Err` is returned if the `NORTHSTAR_CONSOLE` environment variable is not set or has an
116    /// invalid file descriptor for the unix socket.
117    ///
118    pub async fn from_env(notifications: Option<usize>) -> Result<Self, Error> {
119        let fd = env::var("NORTHSTAR_CONSOLE")
120            .map_err(|_| io::Error::new(io::ErrorKind::Other, "missing env variable"))?
121            .parse::<i32>()
122            .map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid env variable"))?;
123
124        let std = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd) };
125        std.set_nonblocking(true)?;
126
127        let io = tokio::net::UnixStream::from_std(std)?;
128        let client = Client::new(io, notifications).await?;
129        Ok(client)
130    }
131}
132
133impl<'a, T: AsyncRead + AsyncWrite + Unpin> Client<T> {
134    /// Create a new northstar client and connect to a runtime instance running on `host`.
135    ///
136    /// # Arguments
137    ///
138    /// * `io` - Connection medium (e.g. Unix or TCP socket)
139    /// * `notifications` - Optional buffer size for receiving notifications
140    /// * `timeout` - Timeout of connection establishment
141    ///
142    /// # Errors
143    ///
144    /// In addition to the errors that can happen when trying to [`connect`], an `Err` is returned
145    /// if the connection establishment times out.
146    ///
147    pub async fn new(io: T, notifications: Option<usize>) -> Result<Client<T>, Error> {
148        let connection = connect(io, notifications.is_some()).await?;
149
150        Ok(Client {
151            connection,
152            notifications: notifications.map(VecDeque::with_capacity),
153        })
154    }
155
156    /// Convert client into a connection
157    pub fn framed(self) -> Connection<T> {
158        self.connection
159    }
160
161    /// Perform a request response sequence
162    ///
163    /// ```no_run
164    /// # use futures::StreamExt;
165    /// # use northstar_client::Client;
166    /// # use northstar_client::model::Request::List;
167    /// #
168    /// # #[tokio::main(flavor = "current_thread")]
169    /// # async fn main() {
170    /// #   let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
171    /// let response = client.request(List).await.expect("failed to request container list");
172    /// println!("{:?}", response);
173    /// # }
174    /// ```
175    pub async fn request(&mut self, request: Request) -> Result<Response, Error> {
176        let message = Message::Request { request };
177        self.connection.send(message).await?;
178        loop {
179            let message = self
180                .connection
181                .next()
182                .await
183                .ok_or(Error::ConnectionClosed)??;
184
185            match message {
186                Message::Response { response } => break Ok(response),
187                Message::Notification { notification } => self.push_notification(notification)?,
188                _ => unreachable!("invalid message {:?}", message),
189            }
190        }
191    }
192
193    /// Request the identificaiton of this container
194    ///
195    /// ```no_run
196    /// # use futures::StreamExt;
197    /// # use northstar_client::Client;
198    /// #
199    /// # #[tokio::main(flavor = "current_thread")]
200    /// # async fn main() {
201    /// #   let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
202    /// let ident = client.ident().await.expect("failed to identity");
203    /// println!("{}", ident);
204    /// # }
205    /// ```
206    pub async fn ident(&mut self) -> Result<Container, Error> {
207        match self.request(Request::Ident).await? {
208            Response::Ident(container) => Ok(container),
209            Response::PermissionDenied(_) => Err(Error::PermissionDenied),
210            _ => unreachable!("response on ident should be ident"),
211        }
212    }
213
214    /// Request a list of installed containers
215    ///
216    /// ```no_run
217    /// # use futures::StreamExt;
218    /// # use northstar_client::Client;
219    /// #
220    /// # #[tokio::main(flavor = "current_thread")]
221    /// # async fn main() {
222    /// #   let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
223    /// let containers = client.list().await.expect("failed to request container list");
224    /// println!("{:#?}", containers);
225    /// # }
226    /// ```
227    pub async fn list(&mut self) -> Result<Vec<Container>, Error> {
228        match self.request(Request::List).await? {
229            Response::List(containers) => Ok(containers),
230            Response::PermissionDenied(_) => Err(Error::PermissionDenied),
231            _ => unreachable!("response on containers should be containers"),
232        }
233    }
234
235    /// Request a list of repositories
236    ///
237    /// ```no_run
238    /// # use futures::StreamExt;
239    /// # use northstar_client::Client;
240    /// #
241    /// # #[tokio::main(flavor = "current_thread")]
242    /// # async fn main() {
243    /// #   let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
244    /// let repositories = client.repositories().await.expect("failed to request repository list");
245    /// println!("{:#?}", repositories);
246    /// # }
247    /// ```
248    pub async fn repositories(&mut self) -> Result<HashSet<RepositoryId>, Error> {
249        match self.request(Request::Repositories).await? {
250            Response::Repositories(repositories) => Ok(repositories),
251            Response::PermissionDenied(_) => Err(Error::PermissionDenied),
252            _ => unreachable!("response on repositories should be ok or error"),
253        }
254    }
255
256    /// Start container with name
257    ///
258    /// ```no_run
259    /// # use futures::StreamExt;
260    /// # use northstar_client::Client;
261    /// #
262    /// # #[tokio::main(flavor = "current_thread")]
263    /// # async fn main() {
264    /// #   let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
265    /// client.start("hello:0.0.1").await.expect("failed to start \"hello\"");
266    /// // Print start notification
267    /// println!("{:#?}", client.next().await);
268    /// # }
269    /// ```
270    pub async fn start<C>(&mut self, container: C) -> Result<(), Error>
271    where
272        C: TryInto<Container>,
273        C::Error: std::error::Error + Send + Sync + 'static,
274    {
275        self.start_command(
276            container,
277            Option::<&str>::None,
278            empty::<&str>(),
279            empty::<(&str, &str)>(),
280        )
281        .await
282    }
283
284    /// Start container name and pass init, arguments and set additional env variables.
285    ///
286    /// ```no_run
287    /// # use futures::StreamExt;
288    /// # use northstar_client::Client;
289    /// # use std::collections::HashMap;
290    /// #
291    /// # #[tokio::main(flavor = "current_thread")]
292    /// # async fn main() {
293    /// #   let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
294    /// let mut env = HashMap::new();
295    /// env.insert("FOO", "blah");
296    /// client.start_command("hello:0.0.1", Some("/bin/hello"), ["--dump", "-v"], env).await.expect("failed to start \"hello\"");
297    /// // Print start notification
298    /// println!("{:#?}", client.next().await);
299    /// # }
300    /// ```
301    pub async fn start_command<C, A, E, K>(
302        &mut self,
303        container: C,
304        init: Option<A>,
305        args: impl IntoIterator<Item = A>,
306        env: impl IntoIterator<Item = (E, K)>,
307    ) -> Result<(), Error>
308    where
309        C: TryInto<Container>,
310        C::Error: std::error::Error + Send + Sync + 'static,
311        A: TryInto<NonNulString>,
312        A::Error: std::error::Error + Send + Sync + 'static,
313        E: TryInto<NonNulString>,
314        E::Error: std::error::Error + Send + Sync + 'static,
315        K: TryInto<NonNulString>,
316        K::Error: std::error::Error + Send + Sync + 'static,
317    {
318        let container = container
319            .try_into()
320            .map_err(|e| Error::InvalidArgument(e.to_string()))?;
321
322        let init = if let Some(init) = init {
323            Some(
324                init.try_into()
325                    .map_err(|e| Error::InvalidArgument(format!("invalid init: {e}")))?,
326            )
327        } else {
328            None
329        };
330
331        let mut args_converted = vec![];
332        for arg in args {
333            args_converted.push(
334                arg.try_into()
335                    .map_err(|e| Error::InvalidArgument(format!("invalid argument: {e}")))?,
336            );
337        }
338
339        let mut env_converted = HashMap::new();
340        for (key, value) in env {
341            let key = key
342                .try_into()
343                .map_err(|e| Error::InvalidArgument(format!("invalid argument: {e}")))?;
344            let value = value
345                .try_into()
346                .map_err(|e| Error::InvalidArgument(format!("invalid argument: {e}")))?;
347            env_converted.insert(key, value);
348        }
349
350        let arguments = args_converted;
351        let environment = env_converted;
352        let request = Request::Start {
353            container,
354            init,
355            arguments,
356            environment,
357        };
358
359        match self.request(request).await? {
360            Response::Start(model::StartResult::Ok { .. }) => Ok(()),
361            Response::Start(model::StartResult::Error { error, .. }) => Err(Error::Runtime(error)),
362            Response::PermissionDenied(_) => Err(Error::PermissionDenied),
363            _ => unreachable!("response on start should be ok or error"),
364        }
365    }
366
367    /// Kill container with name
368    ///
369    /// ```no_run
370    /// # use futures::StreamExt;
371    /// # use northstar_client::Client;
372    /// #
373    /// # #[tokio::main(flavor = "current_thread")]
374    /// # async fn main() {
375    /// #   let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
376    /// client.kill("hello:0.0.1", 15).await.expect("failed to start \"hello\"");
377    /// // Print stop notification
378    /// println!("{:#?}", client.next().await);
379    /// # }
380    /// ```
381    pub async fn kill<C>(&mut self, container: C, signal: i32) -> Result<(), Error>
382    where
383        C: TryInto<Container>,
384        C::Error: std::error::Error + Send + Sync + 'static,
385    {
386        let container = container
387            .try_into()
388            .map_err(|e| Error::InvalidArgument(e.to_string()))?;
389        match self.request(Request::Kill { container, signal }).await? {
390            Response::Kill(model::KillResult::Ok { .. }) => Ok(()),
391            Response::Kill(model::KillResult::Error { error, .. }) => Err(Error::Runtime(error)),
392            Response::PermissionDenied(_) => Err(Error::PermissionDenied),
393            _ => unreachable!("response on kill should be ok or error"),
394        }
395    }
396
397    /// Install a npk from path
398    ///
399    /// ```no_run
400    /// # use northstar_client::Client;
401    /// # use std::path::Path;
402    /// #
403    /// # #[tokio::main(flavor = "current_thread")]
404    /// # async fn main() {
405    /// #   let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
406    /// let npk = Path::new("test.npk");
407    /// client.install_file(npk, "default").await.expect("failed to install \"test.npk\" into repository \"default\"");
408    /// # }
409    /// ```
410    pub async fn install_file(&mut self, npk: &Path, repository: &str) -> Result<Container, Error> {
411        let file = fs::File::open(npk).await?;
412        let size = file.metadata().await?.len();
413
414        self.install(file, size, repository).await
415    }
416
417    /// Install a npk
418    ///
419    /// ```no_run
420    /// # use northstar_client::Client;
421    /// # use std::path::Path;
422    /// # use tokio::fs;
423    /// #
424    /// # #[tokio::main(flavor = "current_thread")]
425    /// # async fn main() {
426    /// #   let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
427    /// let npk = fs::File::open("test.npk").await.expect("failed to open \"test.npk\"");
428    /// let size = npk.metadata().await.unwrap().len();
429    /// client.install(npk, size, "default").await.expect("failed to install \"test.npk\" into repository \"default\"");
430    /// # }
431    /// ```
432    pub async fn install(
433        &mut self,
434        npk: impl AsyncRead + Unpin,
435        size: u64,
436        repository: &str,
437    ) -> Result<Container, Error> {
438        let request = Request::Install {
439            repository: repository.into(),
440            size,
441        };
442        let message = Message::Request { request };
443        self.connection.send(message).await?;
444        self.connection.flush().await?;
445        debug_assert!(self.connection.write_buffer().is_empty());
446
447        let mut reader = io::BufReader::with_capacity(BUFFER_SIZE, npk);
448        let mut writer = BufWriter::with_capacity(BUFFER_SIZE, self.connection.get_mut());
449        io::copy_buf(&mut reader, &mut writer).await?;
450
451        loop {
452            let message = self
453                .connection
454                .next()
455                .await
456                .ok_or(Error::ConnectionClosed)??;
457
458            match message {
459                Message::Response { response } => match response {
460                    Response::Install(InstallResult::Ok { container }) => break Ok(container),
461                    Response::Install(InstallResult::Error { error }) => {
462                        break Err(Error::Runtime(error))
463                    }
464                    Response::PermissionDenied(_) => break Err(Error::PermissionDenied),
465                    _ => unreachable!("response on install should be container or error"),
466                },
467                Message::Notification { notification } => self.push_notification(notification)?,
468                _ => unreachable!("invalid response"),
469            }
470        }
471    }
472
473    /// Uninstall a npk and optionally wipe the containers persistent dir
474    ///
475    /// ```no_run
476    /// # use futures::StreamExt;
477    /// # use northstar_client::Client;
478    /// #
479    /// # #[tokio::main(flavor = "current_thread")]
480    /// # async fn main() {
481    /// #   let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
482    /// client.uninstall("hello:0.0.1", false).await.expect("failed to uninstall \"hello\"");
483    /// // Print stop notification
484    /// println!("{:#?}", client.next().await);
485    /// # }
486    /// ```
487    pub async fn uninstall<C>(&mut self, container: C, wipe: bool) -> Result<(), Error>
488    where
489        C: TryInto<Container>,
490        C::Error: std::error::Error + Send + Sync + 'static,
491    {
492        let container = container
493            .try_into()
494            .map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
495        match self.request(Request::Uninstall { container, wipe }).await? {
496            Response::Uninstall(model::UninstallResult::Ok { .. }) => Ok(()),
497            Response::Uninstall(model::UninstallResult::Error { error, .. }) => {
498                Err(Error::Runtime(error))
499            }
500            Response::PermissionDenied(_) => Err(Error::PermissionDenied),
501            _ => unreachable!("response on uninstall should be ok or error"),
502        }
503    }
504
505    /// Stop the runtime
506    pub async fn shutdown(&mut self) {
507        self.request(Request::Shutdown).await.ok();
508    }
509
510    /// Mount a container
511    /// ```no_run
512    /// # use northstar_client::Client;
513    /// # use std::convert::TryInto;
514    /// #
515    /// # #[tokio::main(flavor = "current_thread")]
516    /// # async fn main() {
517    /// let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
518    /// client.mount("test:0.0.1").await.expect("failed to mount");
519    /// # }
520    /// ```
521    pub async fn mount<C>(&mut self, container: C) -> Result<MountResult, Error>
522    where
523        C: TryInto<Container>,
524        C::Error: std::error::Error + Send + Sync + 'static,
525    {
526        self.mount_all([container])
527            .await
528            .map(|mut r| r.pop().expect("invalid mount result"))
529    }
530
531    /// Mount a list of containers
532    /// ```no_run
533    /// # use northstar_client::Client;
534    /// # use northstar_client::model::Version;
535    /// # use std::path::Path;
536    /// # use std::convert::TryInto;
537    /// #
538    /// # #[tokio::main(flavor = "current_thread")]
539    /// # async fn main() {
540    /// let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
541    /// client.mount_all(vec!("hello-world:0.0.1", "cpueater:0.0.1")).await.expect("failed to mount");
542    /// # }
543    /// ```
544    pub async fn mount_all<C, I>(&mut self, containers: I) -> Result<Vec<MountResult>, Error>
545    where
546        C: TryInto<Container>,
547        C::Error: std::error::Error + Send + Sync + 'static,
548        I: 'a + IntoIterator<Item = C>,
549    {
550        let mut result = vec![];
551        for container in containers.into_iter() {
552            let container = container
553                .try_into()
554                .map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
555            result.push(container);
556        }
557
558        match self.request(Request::Mount { containers: result }).await? {
559            Response::Mount(result) => Ok(result),
560            Response::PermissionDenied(_) => Err(Error::PermissionDenied),
561            _ => unreachable!("response on umount_all should be mount"),
562        }
563    }
564
565    /// Umount a mounted container
566    ///
567    /// ```no_run
568    /// # use northstar_client::Client;
569    /// #
570    /// # #[tokio::main(flavor = "current_thread")]
571    /// # async fn main() {
572    /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
573    /// client.umount("hello:0.0.1").await.expect("failed to unmount \"hello:0.0.1\"");
574    /// # }
575    /// ```
576    pub async fn umount<C>(&mut self, container: C) -> Result<UmountResult, Error>
577    where
578        C: TryInto<Container>,
579        C::Error: std::error::Error + Send + Sync + 'static,
580    {
581        self.umount_all([container])
582            .await
583            .map(|mut r| r.pop().expect("invalid mount result"))
584    }
585
586    /// Umount a list of mounted containers
587    ///
588    /// ```no_run
589    /// # use northstar_client::Client;
590    /// #
591    /// # #[tokio::main(flavor = "current_thread")]
592    /// # async fn main() {
593    /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
594    /// client.umount_all(vec!("hello:0.0.1", "cpueater:0.0.1")).await.expect("failed to unmount \"hello:0.0.1\" and \"cpueater:0.0.1\"");
595    /// # }
596    /// ```
597    pub async fn umount_all<C, I>(&mut self, containers: I) -> Result<Vec<UmountResult>, Error>
598    where
599        C: TryInto<Container>,
600        C::Error: std::error::Error + Send + Sync + 'static,
601        I: 'a + IntoIterator<Item = C>,
602    {
603        let containers = containers.into_iter();
604        let mut result = Vec::with_capacity(containers.size_hint().0);
605        for container in containers {
606            let container = container
607                .try_into()
608                .map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
609            result.push(container);
610        }
611
612        match self.request(Request::Umount { containers: result }).await? {
613            Response::Umount(result) => Ok(result),
614            Response::PermissionDenied(_) => Err(Error::PermissionDenied),
615            _ => unreachable!("response on umount should be umount"),
616        }
617    }
618
619    /// Gather container statistics
620    ///
621    /// ```no_run
622    /// # use northstar_client::Client;
623    /// #
624    /// # #[tokio::main]
625    /// # async fn main() {
626    /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
627    /// println!("{:?}", client.inspect("hello:0.0.1").await.unwrap());
628    /// # }
629    /// ```
630    pub async fn inspect<C>(&mut self, container: C) -> Result<ContainerData, Error>
631    where
632        C: TryInto<Container>,
633        C::Error: std::error::Error + Send + Sync + 'static,
634    {
635        let container = container
636            .try_into()
637            .map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
638        match self.request(Request::Inspect { container }).await? {
639            Response::Inspect(InspectResult::Ok { container: _, data }) => Ok(*data),
640            Response::Inspect(InspectResult::Error {
641                container: _,
642                error,
643            }) => Err(Error::Runtime(error)),
644            Response::PermissionDenied(_) => Err(Error::PermissionDenied),
645            _ => unreachable!("response on container_stats should be a container_stats"),
646        }
647    }
648
649    /// Create a token
650    ///
651    /// The `target` parameter must be the container name (without version) of the container that
652    /// will try to verify the token. The token can only be successfully verified by the container
653    /// that is started with the name `target`!
654    /// The `shared` parameter is added into the token in order to make it specific to a dedicated
655    /// purpose, e.g. "mqtt".
656    ///
657    /// ```no_run
658    /// # use northstar_client::Client;
659    /// #
660    /// # #[tokio::main]
661    /// # async fn main() {
662    /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
663    /// println!("{:?}", client.create_token("webserver", "http").await.unwrap());
664    /// # }
665    /// ```
666    pub async fn create_token<R, S>(&mut self, target: R, shared: S) -> Result<Token, Error>
667    where
668        R: TryInto<Name>,
669        R::Error: std::error::Error + Send + Sync + 'static,
670        S: AsRef<[u8]>,
671    {
672        let target = target
673            .try_into()
674            .map_err(|e| Error::InvalidArgument(format!("invalid target container name: {e}")))?;
675        let shared = shared.as_ref().to_vec();
676        match self
677            .request(Request::TokenCreate { target, shared })
678            .await?
679        {
680            Response::Token(token) => Ok(token),
681            Response::PermissionDenied(_) => Err(Error::PermissionDenied),
682            _ => unreachable!("response on token should be a token reponse created"),
683        }
684    }
685
686    /// Verify a slice of bytes with a token
687    ///
688    /// The `token` parameter shall contain a token that is received from a container.
689    /// The `user` parameter must match the name of the container, that created the token
690    /// and send it to the container that want to verify the token.
691    /// `shared` is some salt that makes a token specific for a usecase can must just match
692    /// the value used when the the token is created.
693    ///
694    /// ```no_run
695    /// # use northstar_client::Client;
696    /// # use northstar_client::model::VerificationResult;
697    /// #
698    /// # #[tokio::main]
699    /// # async fn main() {
700    /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
701    /// let token = client.create_token("hello", "#noafd").await.unwrap(); // token can only verified by container `hello`
702    /// assert_eq!(client.verify_token(&token, "hello", "#noafd").await.unwrap(), VerificationResult::Ok);
703    /// assert_eq!(client.verify_token(&token, "hello", "").await.unwrap(), VerificationResult::Ok);
704    /// # }
705    /// ```
706    pub async fn verify_token<R, S>(
707        &mut self,
708        token: &Token,
709        user: R,
710        shared: S,
711    ) -> Result<VerificationResult, Error>
712    where
713        R: TryInto<Name>,
714        R::Error: std::error::Error + Send + Sync + 'static,
715        S: AsRef<[u8]>,
716    {
717        let token = token.clone();
718        let shared = shared.as_ref().to_vec();
719        let user = user
720            .try_into()
721            .map_err(|e| Error::InvalidArgument(format!("invalid user container name: {e}")))?;
722        match self
723            .request(Request::TokenVerify {
724                token,
725                user,
726                shared,
727            })
728            .await?
729        {
730            Response::TokenVerification(result) => Ok(result),
731            Response::PermissionDenied(_) => Err(Error::PermissionDenied),
732            _ => unreachable!("response on token verification should be a token verification"),
733        }
734    }
735
736    /// Store a notification in the notification queue
737    fn push_notification(&mut self, notification: Notification) -> Result<(), Error> {
738        if let Some(notifications) = &mut self.notifications {
739            if notifications.len() == notifications.capacity() {
740                Err(Error::LaggedNotifications)
741            } else {
742                notifications.push_back(notification);
743                Ok(())
744            }
745        } else {
746            Ok(())
747        }
748    }
749}
750
751/// Stream notifications
752///
753/// ```no_run
754/// use futures::StreamExt;
755/// # use northstar_client::Client;
756///
757/// # #[tokio::main(flavor = "current_thread")]
758/// async fn main() {
759///     let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
760///     client.start("hello:0.0.1").await.expect("failed to start \"hello\"");
761///     while let Some(notification) = client.next().await {
762///         println!("{:?}", notification);
763///     }
764/// }
765/// ```
766impl<T: AsyncRead + AsyncWrite + Unpin> Stream for Client<T> {
767    type Item = Result<Notification, io::Error>;
768
769    fn poll_next(
770        mut self: Pin<&mut Self>,
771        cx: &mut std::task::Context<'_>,
772    ) -> Poll<Option<Self::Item>> {
773        if let Some(n) = self.notifications.as_mut().and_then(|n| n.pop_front()) {
774            Poll::Ready(Some(Ok(n)))
775        } else {
776            match self.connection.poll_next_unpin(cx) {
777                Poll::Ready(r) => match r {
778                    Some(Ok(message)) => match message {
779                        Message::Notification { notification } => {
780                            Poll::Ready(Some(Ok(notification)))
781                        }
782                        _ => unreachable!(),
783                    },
784                    Some(Err(e)) => Poll::Ready(Some(Err(e))),
785                    None => Poll::Ready(None),
786                },
787                Poll::Pending => Poll::Pending,
788            }
789        }
790    }
791}