Skip to main content

ntex_io/
framed.rs

1use std::{fmt, io};
2
3use ntex_codec::{Decoder, Encoder};
4use ntex_util::future::Either;
5
6use crate::IoBoxed;
7
8/// A unified interface to an underlying I/O stream.
9///
10/// Uses the `Encoder` and `Decoder` traits to encode and decode frames.
11pub struct Framed<U> {
12    io: IoBoxed,
13    codec: U,
14}
15
16impl<U> Framed<U> {
17    /// Provides an interface for reading from and writing to an `Io` object,
18    /// using the `Decode` and `Encode` traits of the codec.
19    pub fn new<Io>(io: Io, codec: U) -> Framed<U>
20    where
21        IoBoxed: From<Io>,
22    {
23        Framed {
24            codec,
25            io: IoBoxed::from(io),
26        }
27    }
28
29    #[inline]
30    /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
31    pub fn get_io(&self) -> &IoBoxed {
32        &self.io
33    }
34
35    #[inline]
36    /// Returns a reference to the underlying codec.
37    pub fn get_codec(&self) -> &U {
38        &self.codec
39    }
40
41    #[inline]
42    /// Return inner types of framed object.
43    pub fn into_inner(self) -> (IoBoxed, U) {
44        (self.io, self.codec)
45    }
46}
47
48impl<U> Framed<U>
49where
50    U: Decoder + Encoder,
51{
52    /// Wake write task and instruct to flush data.
53    ///
54    /// This is async version of `poll_flush()` method.
55    pub async fn flush(&self, full: bool) -> Result<(), io::Error> {
56        self.io.flush(full).await
57    }
58
59    /// Shut down io stream.
60    pub async fn shutdown(&self) -> Result<(), io::Error> {
61        self.io.shutdown().await
62    }
63}
64
65impl<U> Framed<U>
66where
67    U: Decoder,
68{
69    #[inline]
70    /// Read incoming io stream and decode codec item.
71    pub async fn recv(&self) -> Result<Option<U::Item>, Either<U::Error, io::Error>> {
72        self.io.recv(&self.codec).await
73    }
74}
75
76impl<U> Framed<U>
77where
78    U: Encoder,
79{
80    #[inline]
81    /// Serialize item and write to the inner buffer
82    pub async fn send(
83        &self,
84        item: <U as Encoder>::Item,
85    ) -> Result<(), Either<U::Error, io::Error>> {
86        self.io.send(item, &self.codec).await
87    }
88}
89
90impl<U> fmt::Debug for Framed<U>
91where
92    U: fmt::Debug,
93{
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        f.debug_struct("Framed")
96            .field("codec", &self.codec)
97            .finish()
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use ntex_bytes::Bytes;
104    use ntex_codec::BytesCodec;
105
106    use super::*;
107    use crate::{Io, testing::IoTest};
108
109    #[ntex::test]
110    async fn framed() {
111        let (client, server) = IoTest::create();
112        client.remote_buffer_cap(1024);
113        client.write(b"chunk-0");
114
115        let server = Framed::new(Io::from(server), BytesCodec);
116        server.get_codec();
117        server.get_io();
118        assert!(format!("{server:?}").contains("Framed"));
119
120        let item = server.recv().await.unwrap().unwrap();
121        assert_eq!(item, b"chunk-0".as_ref());
122
123        let data = Bytes::from_static(b"chunk-1");
124        server.send(data).await.unwrap();
125        server.flush(true).await.unwrap();
126        assert_eq!(client.read_any(), b"chunk-1".as_ref());
127
128        server.shutdown().await.unwrap();
129        assert!(client.is_closed());
130    }
131}