northstar_runtime/api/
codec.rs

1use super::model;
2use std::io::ErrorKind;
3use tokio::io::{self, AsyncRead, AsyncWrite};
4use tokio_util::codec::{Decoder, Encoder, LinesCodec};
5
6/// Newline delimited json
7pub type Framed<T> = tokio_util::codec::Framed<T, Codec>;
8
9/// Framed wrapper
10pub fn framed<T>(inner: T) -> Framed<T>
11where
12    T: AsyncRead + AsyncWrite,
13{
14    tokio_util::codec::Framed::new(inner, Codec::default())
15}
16
17/// Framed wrapper with a defined maximum line length
18pub fn framed_with_max_length<T>(inner: T, max_length: usize) -> Framed<T>
19where
20    T: AsyncRead + AsyncWrite,
21{
22    tokio_util::codec::Framed::new(inner, Codec::new_with_max_length(max_length))
23}
24
25/// Newline delimited json
26#[derive(Default)]
27pub struct Codec {
28    inner: LinesCodec,
29}
30
31impl Codec {
32    /// Returns a Codec with a maximum line length limit.
33    ///
34    /// If this is set, calls to Codec::decode will return a
35    /// io::Error when a line exceeds the length limit. Subsequent calls
36    /// will discard up to limit bytes from that line until a newline character
37    /// is reached, returning None until the line over the limit has been fully
38    /// discarded. After that point, calls to decode will function as normal.
39    pub fn new_with_max_length(max_length: usize) -> Codec {
40        Codec {
41            inner: LinesCodec::new_with_max_length(max_length),
42        }
43    }
44}
45
46impl Decoder for Codec {
47    type Item = model::Message;
48    type Error = io::Error;
49
50    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
51        self.inner
52            .decode(src)
53            .map_err(|e| io::Error::new(ErrorKind::Other, e))? // See LinesCodecError.
54            .as_deref()
55            .map(serde_json::from_str)
56            .transpose()
57            .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))
58    }
59}
60
61impl Encoder<model::Message> for Codec {
62    type Error = io::Error;
63
64    fn encode(
65        &mut self,
66        item: model::Message,
67        dst: &mut bytes::BytesMut,
68    ) -> Result<(), Self::Error> {
69        self.inner
70            .encode(serde_json::to_string(&item)?.as_str(), dst)
71            .map_err(|e| io::Error::new(ErrorKind::Other, e))
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78    use crate::api::model::{Message, Notification, Request};
79    use bytes::BytesMut;
80    use proptest::{prelude::Just, prop_oneof, proptest, strategy::Strategy};
81
82    proptest! {
83        #[test]
84        fn encoding_a_message_then_decoding_it_yields_the_same_message(initial_message in mk_message()) {
85            // Pre-condition.
86            let mut message_as_bytes = BytesMut::default();
87
88            // Action.
89            let mut codec = Codec::default();
90
91            codec.encode(initial_message.clone(), &mut message_as_bytes)?;
92            let message = codec.decode(&mut message_as_bytes)?;
93
94            // Post-condition.
95            assert_eq!(message, Some(initial_message));
96        }
97    }
98
99    fn mk_message() -> impl Strategy<Value = Message> {
100        prop_oneof![
101            Just(Message::Request {
102                request: Request::List
103            }),
104            Just(Message::Request {
105                request: Request::Shutdown
106            }),
107            Just(Message::Request {
108                request: Request::Mount { containers: vec!() },
109            }),
110            Just(Message::Notification {
111                notification: Notification::Shutdown
112            }),
113        ]
114    }
115}