northstar_client/lib.rs
1use std::{
2 collections::{HashMap, HashSet, VecDeque},
3 convert::TryInto,
4 env,
5 iter::empty,
6 os::unix::prelude::FromRawFd,
7 path::Path,
8 pin::Pin,
9 task::Poll,
10};
11
12use error::Error;
13use futures::{SinkExt, Stream, StreamExt};
14use northstar_runtime::{
15 api::{
16 codec,
17 model::{
18 ConnectNack, Container, ContainerData, InspectResult, InstallResult, Message,
19 MountResult, Notification, RepositoryId, Request, Response, Token, UmountResult,
20 VerificationResult,
21 },
22 },
23 common::non_nul_string::NonNulString,
24};
25use tokio::{
26 fs,
27 io::{self, AsyncRead, AsyncWrite, BufWriter},
28};
29
30/// Client errors
31pub mod error;
32pub use northstar_runtime::{
33 api::{model, VERSION},
34 common::name::Name,
35};
36
37/// Default buffer size for installation transfers
38const BUFFER_SIZE: usize = 1024 * 1024;
39
40/// Client for a Northstar runtime instance.
41///
42/// ```no_run
43/// use futures::StreamExt;
44/// use northstar_client::Client;
45/// use northstar_client::model::Version;
46///
47/// # #[tokio::main(flavor = "current_thread")]
48/// async fn main() {
49/// let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
50/// client.start("hello:0.0.1").await.expect("failed to start \"hello\"");
51/// while let Some(notification) = client.next().await {
52/// println!("{:?}", notification);
53/// }
54/// }
55/// ```
56pub struct Client<T> {
57 /// Connection to the runtime
58 connection: codec::Framed<T>,
59 /// Buffer notifications received during request response communication
60 notifications: Option<VecDeque<Notification>>,
61}
62
63/// Northstar console connection
64pub type Connection<T> = codec::Framed<T>;
65
66/// Connect and return a raw stream and sink interface. See codec for details
67///
68/// # Arguments
69///
70/// * `io` - Medium for the connection (e.g. Unix or TCP socket)
71/// * `subscribe_notifications` - Enables the reception of notifications through the connection.
72///
73/// # Errors
74///
75/// An error is returned in the following cases:
76///
77/// - A mismatch in the protocol version between both sides of the connection
78/// - Unnecessary permissions
79/// - OS errors
80///
81pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
82 io: T,
83 subscribe_notifications: bool,
84) -> Result<Connection<T>, Error> {
85 let mut connection = codec::framed(io);
86
87 // Send connect message
88 connection
89 .send(Message::Connect {
90 connect: model::Connect {
91 version: VERSION,
92 subscribe_notifications,
93 },
94 })
95 .await?;
96
97 // Wait for conack
98 let message = connection.next().await.ok_or(Error::ConnectionClosed)??;
99
100 match message {
101 Message::ConnectAck { .. } => Ok(connection),
102 Message::ConnectNack { connect_nack } => match connect_nack {
103 ConnectNack::InvalidProtocolVersion { .. } => Err(Error::ProtocolVersion),
104 ConnectNack::PermissionDenied => Err(Error::PermissionDenied),
105 },
106 _ => unreachable!("expecting connect ack or connect nack"),
107 }
108}
109
110impl Client<tokio::net::UnixStream> {
111 /// Tries to create a client by accessing `NORTHSTAR_CONSOLE` env variable
112 ///
113 /// # Errors
114 ///
115 /// An `Err` is returned if the `NORTHSTAR_CONSOLE` environment variable is not set or has an
116 /// invalid file descriptor for the unix socket.
117 ///
118 pub async fn from_env(notifications: Option<usize>) -> Result<Self, Error> {
119 let fd = env::var("NORTHSTAR_CONSOLE")
120 .map_err(|_| io::Error::new(io::ErrorKind::Other, "missing env variable"))?
121 .parse::<i32>()
122 .map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid env variable"))?;
123
124 let std = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd) };
125 std.set_nonblocking(true)?;
126
127 let io = tokio::net::UnixStream::from_std(std)?;
128 let client = Client::new(io, notifications).await?;
129 Ok(client)
130 }
131}
132
133impl<'a, T: AsyncRead + AsyncWrite + Unpin> Client<T> {
134 /// Create a new northstar client and connect to a runtime instance running on `host`.
135 ///
136 /// # Arguments
137 ///
138 /// * `io` - Connection medium (e.g. Unix or TCP socket)
139 /// * `notifications` - Optional buffer size for receiving notifications
140 /// * `timeout` - Timeout of connection establishment
141 ///
142 /// # Errors
143 ///
144 /// In addition to the errors that can happen when trying to [`connect`], an `Err` is returned
145 /// if the connection establishment times out.
146 ///
147 pub async fn new(io: T, notifications: Option<usize>) -> Result<Client<T>, Error> {
148 let connection = connect(io, notifications.is_some()).await?;
149
150 Ok(Client {
151 connection,
152 notifications: notifications.map(VecDeque::with_capacity),
153 })
154 }
155
156 /// Convert client into a connection
157 pub fn framed(self) -> Connection<T> {
158 self.connection
159 }
160
161 /// Perform a request response sequence
162 ///
163 /// ```no_run
164 /// # use futures::StreamExt;
165 /// # use northstar_client::Client;
166 /// # use northstar_client::model::Request::List;
167 /// #
168 /// # #[tokio::main(flavor = "current_thread")]
169 /// # async fn main() {
170 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
171 /// let response = client.request(List).await.expect("failed to request container list");
172 /// println!("{:?}", response);
173 /// # }
174 /// ```
175 pub async fn request(&mut self, request: Request) -> Result<Response, Error> {
176 let message = Message::Request { request };
177 self.connection.send(message).await?;
178 loop {
179 let message = self
180 .connection
181 .next()
182 .await
183 .ok_or(Error::ConnectionClosed)??;
184
185 match message {
186 Message::Response { response } => break Ok(response),
187 Message::Notification { notification } => self.push_notification(notification)?,
188 _ => unreachable!("invalid message {:?}", message),
189 }
190 }
191 }
192
193 /// Request the identificaiton of this container
194 ///
195 /// ```no_run
196 /// # use futures::StreamExt;
197 /// # use northstar_client::Client;
198 /// #
199 /// # #[tokio::main(flavor = "current_thread")]
200 /// # async fn main() {
201 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
202 /// let ident = client.ident().await.expect("failed to identity");
203 /// println!("{}", ident);
204 /// # }
205 /// ```
206 pub async fn ident(&mut self) -> Result<Container, Error> {
207 match self.request(Request::Ident).await? {
208 Response::Ident(container) => Ok(container),
209 Response::PermissionDenied(_) => Err(Error::PermissionDenied),
210 _ => unreachable!("response on ident should be ident"),
211 }
212 }
213
214 /// Request a list of installed containers
215 ///
216 /// ```no_run
217 /// # use futures::StreamExt;
218 /// # use northstar_client::Client;
219 /// #
220 /// # #[tokio::main(flavor = "current_thread")]
221 /// # async fn main() {
222 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
223 /// let containers = client.list().await.expect("failed to request container list");
224 /// println!("{:#?}", containers);
225 /// # }
226 /// ```
227 pub async fn list(&mut self) -> Result<Vec<Container>, Error> {
228 match self.request(Request::List).await? {
229 Response::List(containers) => Ok(containers),
230 Response::PermissionDenied(_) => Err(Error::PermissionDenied),
231 _ => unreachable!("response on containers should be containers"),
232 }
233 }
234
235 /// Request a list of repositories
236 ///
237 /// ```no_run
238 /// # use futures::StreamExt;
239 /// # use northstar_client::Client;
240 /// #
241 /// # #[tokio::main(flavor = "current_thread")]
242 /// # async fn main() {
243 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
244 /// let repositories = client.repositories().await.expect("failed to request repository list");
245 /// println!("{:#?}", repositories);
246 /// # }
247 /// ```
248 pub async fn repositories(&mut self) -> Result<HashSet<RepositoryId>, Error> {
249 match self.request(Request::Repositories).await? {
250 Response::Repositories(repositories) => Ok(repositories),
251 Response::PermissionDenied(_) => Err(Error::PermissionDenied),
252 _ => unreachable!("response on repositories should be ok or error"),
253 }
254 }
255
256 /// Start container with name
257 ///
258 /// ```no_run
259 /// # use futures::StreamExt;
260 /// # use northstar_client::Client;
261 /// #
262 /// # #[tokio::main(flavor = "current_thread")]
263 /// # async fn main() {
264 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
265 /// client.start("hello:0.0.1").await.expect("failed to start \"hello\"");
266 /// // Print start notification
267 /// println!("{:#?}", client.next().await);
268 /// # }
269 /// ```
270 pub async fn start<C>(&mut self, container: C) -> Result<(), Error>
271 where
272 C: TryInto<Container>,
273 C::Error: std::error::Error + Send + Sync + 'static,
274 {
275 self.start_command(
276 container,
277 Option::<&str>::None,
278 empty::<&str>(),
279 empty::<(&str, &str)>(),
280 )
281 .await
282 }
283
284 /// Start container name and pass init, arguments and set additional env variables.
285 ///
286 /// ```no_run
287 /// # use futures::StreamExt;
288 /// # use northstar_client::Client;
289 /// # use std::collections::HashMap;
290 /// #
291 /// # #[tokio::main(flavor = "current_thread")]
292 /// # async fn main() {
293 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
294 /// let mut env = HashMap::new();
295 /// env.insert("FOO", "blah");
296 /// client.start_command("hello:0.0.1", Some("/bin/hello"), ["--dump", "-v"], env).await.expect("failed to start \"hello\"");
297 /// // Print start notification
298 /// println!("{:#?}", client.next().await);
299 /// # }
300 /// ```
301 pub async fn start_command<C, A, E, K>(
302 &mut self,
303 container: C,
304 init: Option<A>,
305 args: impl IntoIterator<Item = A>,
306 env: impl IntoIterator<Item = (E, K)>,
307 ) -> Result<(), Error>
308 where
309 C: TryInto<Container>,
310 C::Error: std::error::Error + Send + Sync + 'static,
311 A: TryInto<NonNulString>,
312 A::Error: std::error::Error + Send + Sync + 'static,
313 E: TryInto<NonNulString>,
314 E::Error: std::error::Error + Send + Sync + 'static,
315 K: TryInto<NonNulString>,
316 K::Error: std::error::Error + Send + Sync + 'static,
317 {
318 let container = container
319 .try_into()
320 .map_err(|e| Error::InvalidArgument(e.to_string()))?;
321
322 let init = if let Some(init) = init {
323 Some(
324 init.try_into()
325 .map_err(|e| Error::InvalidArgument(format!("invalid init: {e}")))?,
326 )
327 } else {
328 None
329 };
330
331 let mut args_converted = vec![];
332 for arg in args {
333 args_converted.push(
334 arg.try_into()
335 .map_err(|e| Error::InvalidArgument(format!("invalid argument: {e}")))?,
336 );
337 }
338
339 let mut env_converted = HashMap::new();
340 for (key, value) in env {
341 let key = key
342 .try_into()
343 .map_err(|e| Error::InvalidArgument(format!("invalid argument: {e}")))?;
344 let value = value
345 .try_into()
346 .map_err(|e| Error::InvalidArgument(format!("invalid argument: {e}")))?;
347 env_converted.insert(key, value);
348 }
349
350 let arguments = args_converted;
351 let environment = env_converted;
352 let request = Request::Start {
353 container,
354 init,
355 arguments,
356 environment,
357 };
358
359 match self.request(request).await? {
360 Response::Start(model::StartResult::Ok { .. }) => Ok(()),
361 Response::Start(model::StartResult::Error { error, .. }) => Err(Error::Runtime(error)),
362 Response::PermissionDenied(_) => Err(Error::PermissionDenied),
363 _ => unreachable!("response on start should be ok or error"),
364 }
365 }
366
367 /// Kill container with name
368 ///
369 /// ```no_run
370 /// # use futures::StreamExt;
371 /// # use northstar_client::Client;
372 /// #
373 /// # #[tokio::main(flavor = "current_thread")]
374 /// # async fn main() {
375 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
376 /// client.kill("hello:0.0.1", 15).await.expect("failed to start \"hello\"");
377 /// // Print stop notification
378 /// println!("{:#?}", client.next().await);
379 /// # }
380 /// ```
381 pub async fn kill<C>(&mut self, container: C, signal: i32) -> Result<(), Error>
382 where
383 C: TryInto<Container>,
384 C::Error: std::error::Error + Send + Sync + 'static,
385 {
386 let container = container
387 .try_into()
388 .map_err(|e| Error::InvalidArgument(e.to_string()))?;
389 match self.request(Request::Kill { container, signal }).await? {
390 Response::Kill(model::KillResult::Ok { .. }) => Ok(()),
391 Response::Kill(model::KillResult::Error { error, .. }) => Err(Error::Runtime(error)),
392 Response::PermissionDenied(_) => Err(Error::PermissionDenied),
393 _ => unreachable!("response on kill should be ok or error"),
394 }
395 }
396
397 /// Install a npk from path
398 ///
399 /// ```no_run
400 /// # use northstar_client::Client;
401 /// # use std::path::Path;
402 /// #
403 /// # #[tokio::main(flavor = "current_thread")]
404 /// # async fn main() {
405 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
406 /// let npk = Path::new("test.npk");
407 /// client.install_file(npk, "default").await.expect("failed to install \"test.npk\" into repository \"default\"");
408 /// # }
409 /// ```
410 pub async fn install_file(&mut self, npk: &Path, repository: &str) -> Result<Container, Error> {
411 let file = fs::File::open(npk).await?;
412 let size = file.metadata().await?.len();
413
414 self.install(file, size, repository).await
415 }
416
417 /// Install a npk
418 ///
419 /// ```no_run
420 /// # use northstar_client::Client;
421 /// # use std::path::Path;
422 /// # use tokio::fs;
423 /// #
424 /// # #[tokio::main(flavor = "current_thread")]
425 /// # async fn main() {
426 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
427 /// let npk = fs::File::open("test.npk").await.expect("failed to open \"test.npk\"");
428 /// let size = npk.metadata().await.unwrap().len();
429 /// client.install(npk, size, "default").await.expect("failed to install \"test.npk\" into repository \"default\"");
430 /// # }
431 /// ```
432 pub async fn install(
433 &mut self,
434 npk: impl AsyncRead + Unpin,
435 size: u64,
436 repository: &str,
437 ) -> Result<Container, Error> {
438 let request = Request::Install {
439 repository: repository.into(),
440 size,
441 };
442 let message = Message::Request { request };
443 self.connection.send(message).await?;
444 self.connection.flush().await?;
445 debug_assert!(self.connection.write_buffer().is_empty());
446
447 let mut reader = io::BufReader::with_capacity(BUFFER_SIZE, npk);
448 let mut writer = BufWriter::with_capacity(BUFFER_SIZE, self.connection.get_mut());
449 io::copy_buf(&mut reader, &mut writer).await?;
450
451 loop {
452 let message = self
453 .connection
454 .next()
455 .await
456 .ok_or(Error::ConnectionClosed)??;
457
458 match message {
459 Message::Response { response } => match response {
460 Response::Install(InstallResult::Ok { container }) => break Ok(container),
461 Response::Install(InstallResult::Error { error }) => {
462 break Err(Error::Runtime(error))
463 }
464 Response::PermissionDenied(_) => break Err(Error::PermissionDenied),
465 _ => unreachable!("response on install should be container or error"),
466 },
467 Message::Notification { notification } => self.push_notification(notification)?,
468 _ => unreachable!("invalid response"),
469 }
470 }
471 }
472
473 /// Uninstall a npk and optionally wipe the containers persistent dir
474 ///
475 /// ```no_run
476 /// # use futures::StreamExt;
477 /// # use northstar_client::Client;
478 /// #
479 /// # #[tokio::main(flavor = "current_thread")]
480 /// # async fn main() {
481 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
482 /// client.uninstall("hello:0.0.1", false).await.expect("failed to uninstall \"hello\"");
483 /// // Print stop notification
484 /// println!("{:#?}", client.next().await);
485 /// # }
486 /// ```
487 pub async fn uninstall<C>(&mut self, container: C, wipe: bool) -> Result<(), Error>
488 where
489 C: TryInto<Container>,
490 C::Error: std::error::Error + Send + Sync + 'static,
491 {
492 let container = container
493 .try_into()
494 .map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
495 match self.request(Request::Uninstall { container, wipe }).await? {
496 Response::Uninstall(model::UninstallResult::Ok { .. }) => Ok(()),
497 Response::Uninstall(model::UninstallResult::Error { error, .. }) => {
498 Err(Error::Runtime(error))
499 }
500 Response::PermissionDenied(_) => Err(Error::PermissionDenied),
501 _ => unreachable!("response on uninstall should be ok or error"),
502 }
503 }
504
505 /// Stop the runtime
506 pub async fn shutdown(&mut self) {
507 self.request(Request::Shutdown).await.ok();
508 }
509
510 /// Mount a container
511 /// ```no_run
512 /// # use northstar_client::Client;
513 /// # use std::convert::TryInto;
514 /// #
515 /// # #[tokio::main(flavor = "current_thread")]
516 /// # async fn main() {
517 /// let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
518 /// client.mount("test:0.0.1").await.expect("failed to mount");
519 /// # }
520 /// ```
521 pub async fn mount<C>(&mut self, container: C) -> Result<MountResult, Error>
522 where
523 C: TryInto<Container>,
524 C::Error: std::error::Error + Send + Sync + 'static,
525 {
526 self.mount_all([container])
527 .await
528 .map(|mut r| r.pop().expect("invalid mount result"))
529 }
530
531 /// Mount a list of containers
532 /// ```no_run
533 /// # use northstar_client::Client;
534 /// # use northstar_client::model::Version;
535 /// # use std::path::Path;
536 /// # use std::convert::TryInto;
537 /// #
538 /// # #[tokio::main(flavor = "current_thread")]
539 /// # async fn main() {
540 /// let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
541 /// client.mount_all(vec!("hello-world:0.0.1", "cpueater:0.0.1")).await.expect("failed to mount");
542 /// # }
543 /// ```
544 pub async fn mount_all<C, I>(&mut self, containers: I) -> Result<Vec<MountResult>, Error>
545 where
546 C: TryInto<Container>,
547 C::Error: std::error::Error + Send + Sync + 'static,
548 I: 'a + IntoIterator<Item = C>,
549 {
550 let mut result = vec![];
551 for container in containers.into_iter() {
552 let container = container
553 .try_into()
554 .map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
555 result.push(container);
556 }
557
558 match self.request(Request::Mount { containers: result }).await? {
559 Response::Mount(result) => Ok(result),
560 Response::PermissionDenied(_) => Err(Error::PermissionDenied),
561 _ => unreachable!("response on umount_all should be mount"),
562 }
563 }
564
565 /// Umount a mounted container
566 ///
567 /// ```no_run
568 /// # use northstar_client::Client;
569 /// #
570 /// # #[tokio::main(flavor = "current_thread")]
571 /// # async fn main() {
572 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
573 /// client.umount("hello:0.0.1").await.expect("failed to unmount \"hello:0.0.1\"");
574 /// # }
575 /// ```
576 pub async fn umount<C>(&mut self, container: C) -> Result<UmountResult, Error>
577 where
578 C: TryInto<Container>,
579 C::Error: std::error::Error + Send + Sync + 'static,
580 {
581 self.umount_all([container])
582 .await
583 .map(|mut r| r.pop().expect("invalid mount result"))
584 }
585
586 /// Umount a list of mounted containers
587 ///
588 /// ```no_run
589 /// # use northstar_client::Client;
590 /// #
591 /// # #[tokio::main(flavor = "current_thread")]
592 /// # async fn main() {
593 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
594 /// client.umount_all(vec!("hello:0.0.1", "cpueater:0.0.1")).await.expect("failed to unmount \"hello:0.0.1\" and \"cpueater:0.0.1\"");
595 /// # }
596 /// ```
597 pub async fn umount_all<C, I>(&mut self, containers: I) -> Result<Vec<UmountResult>, Error>
598 where
599 C: TryInto<Container>,
600 C::Error: std::error::Error + Send + Sync + 'static,
601 I: 'a + IntoIterator<Item = C>,
602 {
603 let containers = containers.into_iter();
604 let mut result = Vec::with_capacity(containers.size_hint().0);
605 for container in containers {
606 let container = container
607 .try_into()
608 .map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
609 result.push(container);
610 }
611
612 match self.request(Request::Umount { containers: result }).await? {
613 Response::Umount(result) => Ok(result),
614 Response::PermissionDenied(_) => Err(Error::PermissionDenied),
615 _ => unreachable!("response on umount should be umount"),
616 }
617 }
618
619 /// Gather container statistics
620 ///
621 /// ```no_run
622 /// # use northstar_client::Client;
623 /// #
624 /// # #[tokio::main]
625 /// # async fn main() {
626 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
627 /// println!("{:?}", client.inspect("hello:0.0.1").await.unwrap());
628 /// # }
629 /// ```
630 pub async fn inspect<C>(&mut self, container: C) -> Result<ContainerData, Error>
631 where
632 C: TryInto<Container>,
633 C::Error: std::error::Error + Send + Sync + 'static,
634 {
635 let container = container
636 .try_into()
637 .map_err(|e| Error::InvalidArgument(format!("invalid container: {e}")))?;
638 match self.request(Request::Inspect { container }).await? {
639 Response::Inspect(InspectResult::Ok { container: _, data }) => Ok(*data),
640 Response::Inspect(InspectResult::Error {
641 container: _,
642 error,
643 }) => Err(Error::Runtime(error)),
644 Response::PermissionDenied(_) => Err(Error::PermissionDenied),
645 _ => unreachable!("response on container_stats should be a container_stats"),
646 }
647 }
648
649 /// Create a token
650 ///
651 /// The `target` parameter must be the container name (without version) of the container that
652 /// will try to verify the token. The token can only be successfully verified by the container
653 /// that is started with the name `target`!
654 /// The `shared` parameter is added into the token in order to make it specific to a dedicated
655 /// purpose, e.g. "mqtt".
656 ///
657 /// ```no_run
658 /// # use northstar_client::Client;
659 /// #
660 /// # #[tokio::main]
661 /// # async fn main() {
662 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
663 /// println!("{:?}", client.create_token("webserver", "http").await.unwrap());
664 /// # }
665 /// ```
666 pub async fn create_token<R, S>(&mut self, target: R, shared: S) -> Result<Token, Error>
667 where
668 R: TryInto<Name>,
669 R::Error: std::error::Error + Send + Sync + 'static,
670 S: AsRef<[u8]>,
671 {
672 let target = target
673 .try_into()
674 .map_err(|e| Error::InvalidArgument(format!("invalid target container name: {e}")))?;
675 let shared = shared.as_ref().to_vec();
676 match self
677 .request(Request::TokenCreate { target, shared })
678 .await?
679 {
680 Response::Token(token) => Ok(token),
681 Response::PermissionDenied(_) => Err(Error::PermissionDenied),
682 _ => unreachable!("response on token should be a token reponse created"),
683 }
684 }
685
686 /// Verify a slice of bytes with a token
687 ///
688 /// The `token` parameter shall contain a token that is received from a container.
689 /// The `user` parameter must match the name of the container, that created the token
690 /// and send it to the container that want to verify the token.
691 /// `shared` is some salt that makes a token specific for a usecase can must just match
692 /// the value used when the the token is created.
693 ///
694 /// ```no_run
695 /// # use northstar_client::Client;
696 /// # use northstar_client::model::VerificationResult;
697 /// #
698 /// # #[tokio::main]
699 /// # async fn main() {
700 /// # let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
701 /// let token = client.create_token("hello", "#noafd").await.unwrap(); // token can only verified by container `hello`
702 /// assert_eq!(client.verify_token(&token, "hello", "#noafd").await.unwrap(), VerificationResult::Ok);
703 /// assert_eq!(client.verify_token(&token, "hello", "").await.unwrap(), VerificationResult::Ok);
704 /// # }
705 /// ```
706 pub async fn verify_token<R, S>(
707 &mut self,
708 token: &Token,
709 user: R,
710 shared: S,
711 ) -> Result<VerificationResult, Error>
712 where
713 R: TryInto<Name>,
714 R::Error: std::error::Error + Send + Sync + 'static,
715 S: AsRef<[u8]>,
716 {
717 let token = token.clone();
718 let shared = shared.as_ref().to_vec();
719 let user = user
720 .try_into()
721 .map_err(|e| Error::InvalidArgument(format!("invalid user container name: {e}")))?;
722 match self
723 .request(Request::TokenVerify {
724 token,
725 user,
726 shared,
727 })
728 .await?
729 {
730 Response::TokenVerification(result) => Ok(result),
731 Response::PermissionDenied(_) => Err(Error::PermissionDenied),
732 _ => unreachable!("response on token verification should be a token verification"),
733 }
734 }
735
736 /// Store a notification in the notification queue
737 fn push_notification(&mut self, notification: Notification) -> Result<(), Error> {
738 if let Some(notifications) = &mut self.notifications {
739 if notifications.len() == notifications.capacity() {
740 Err(Error::LaggedNotifications)
741 } else {
742 notifications.push_back(notification);
743 Ok(())
744 }
745 } else {
746 Ok(())
747 }
748 }
749}
750
751/// Stream notifications
752///
753/// ```no_run
754/// use futures::StreamExt;
755/// # use northstar_client::Client;
756///
757/// # #[tokio::main(flavor = "current_thread")]
758/// async fn main() {
759/// let mut client = Client::new(tokio::net::TcpStream::connect("localhost:4200").await.unwrap(), None).await.unwrap();
760/// client.start("hello:0.0.1").await.expect("failed to start \"hello\"");
761/// while let Some(notification) = client.next().await {
762/// println!("{:?}", notification);
763/// }
764/// }
765/// ```
766impl<T: AsyncRead + AsyncWrite + Unpin> Stream for Client<T> {
767 type Item = Result<Notification, io::Error>;
768
769 fn poll_next(
770 mut self: Pin<&mut Self>,
771 cx: &mut std::task::Context<'_>,
772 ) -> Poll<Option<Self::Item>> {
773 if let Some(n) = self.notifications.as_mut().and_then(|n| n.pop_front()) {
774 Poll::Ready(Some(Ok(n)))
775 } else {
776 match self.connection.poll_next_unpin(cx) {
777 Poll::Ready(r) => match r {
778 Some(Ok(message)) => match message {
779 Message::Notification { notification } => {
780 Poll::Ready(Some(Ok(notification)))
781 }
782 _ => unreachable!(),
783 },
784 Some(Err(e)) => Poll::Ready(Some(Err(e))),
785 None => Poll::Ready(None),
786 },
787 Poll::Pending => Poll::Pending,
788 }
789 }
790 }
791}