ntex_io/
framed.rs

1use std::{fmt, io};
2
3use ntex_codec::{Decoder, Encoder};
4use ntex_util::future::Either;
5
6use crate::IoBoxed;
7
8/// A unified interface to an underlying I/O object, using
9/// the `Encoder` and `Decoder` traits to encode and decode frames.
10/// `Framed` is heavily optimized for streaming io.
11pub struct Framed<U> {
12    io: IoBoxed,
13    codec: U,
14}
15
16impl<U> Framed<U> {
17    #[inline]
18    /// Provides an interface for reading and writing to
19    /// `Io` object, using `Decode` and `Encode` traits of codec.
20    pub fn new<Io>(io: Io, codec: U) -> Framed<U>
21    where
22        IoBoxed: From<Io>,
23    {
24        Framed {
25            codec,
26            io: IoBoxed::from(io),
27        }
28    }
29
30    #[inline]
31    /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
32    pub fn get_io(&self) -> &IoBoxed {
33        &self.io
34    }
35
36    #[inline]
37    /// Returns a reference to the underlying codec.
38    pub fn get_codec(&self) -> &U {
39        &self.codec
40    }
41
42    #[inline]
43    /// Return inner types of framed object
44    pub fn into_inner(self) -> (IoBoxed, U) {
45        (self.io, self.codec)
46    }
47}
48
49impl<U> Framed<U>
50where
51    U: Decoder + Encoder,
52{
53    #[inline]
54    /// Wake write task and instruct to flush data.
55    ///
56    /// This is async version of .poll_flush() method.
57    pub async fn flush(&self, full: bool) -> Result<(), io::Error> {
58        self.io.flush(full).await
59    }
60
61    #[inline]
62    /// Shut down io stream
63    pub async fn shutdown(&self) -> Result<(), io::Error> {
64        self.io.shutdown().await
65    }
66}
67
68impl<U> Framed<U>
69where
70    U: Decoder,
71{
72    #[inline]
73    /// Read incoming io stream and decode codec item.
74    pub async fn recv(&self) -> Result<Option<U::Item>, Either<U::Error, io::Error>> {
75        self.io.recv(&self.codec).await
76    }
77}
78
79impl<U> Framed<U>
80where
81    U: Encoder,
82{
83    #[inline]
84    /// Serialize item and Write to the inner buffer
85    pub async fn send(
86        &self,
87        item: <U as Encoder>::Item,
88    ) -> Result<(), Either<U::Error, io::Error>> {
89        self.io.send(item, &self.codec).await
90    }
91}
92
93impl<U> fmt::Debug for Framed<U>
94where
95    U: fmt::Debug,
96{
97    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        f.debug_struct("Framed")
99            .field("codec", &self.codec)
100            .finish()
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use ntex_bytes::Bytes;
107    use ntex_codec::BytesCodec;
108
109    use super::*;
110    use crate::{testing::IoTest, Io};
111
112    #[ntex::test]
113    async fn framed() {
114        let (client, server) = IoTest::create();
115        client.remote_buffer_cap(1024);
116        client.write(b"chunk-0");
117
118        let server = Framed::new(Io::new(server), BytesCodec);
119        server.get_codec();
120        server.get_io();
121        assert!(format!("{server:?}").contains("Framed"));
122
123        let item = server.recv().await.unwrap().unwrap();
124        assert_eq!(item, b"chunk-0".as_ref());
125
126        let data = Bytes::from_static(b"chunk-1");
127        server.send(data).await.unwrap();
128        server.flush(true).await.unwrap();
129        assert_eq!(client.read_any(), b"chunk-1".as_ref());
130
131        server.shutdown().await.unwrap();
132        assert!(client.is_closed());
133    }
134}