1use std::{fmt, io};
2
3use ntex_codec::{Decoder, Encoder};
4use ntex_util::future::Either;
5
6use crate::IoBoxed;
7
8pub struct Framed<U> {
12 io: IoBoxed,
13 codec: U,
14}
15
16impl<U> Framed<U> {
17 pub fn new<Io>(io: Io, codec: U) -> Framed<U>
20 where
21 IoBoxed: From<Io>,
22 {
23 Framed {
24 codec,
25 io: IoBoxed::from(io),
26 }
27 }
28
29 #[inline]
30 pub fn get_io(&self) -> &IoBoxed {
32 &self.io
33 }
34
35 #[inline]
36 pub fn get_codec(&self) -> &U {
38 &self.codec
39 }
40
41 #[inline]
42 pub fn into_inner(self) -> (IoBoxed, U) {
44 (self.io, self.codec)
45 }
46}
47
48impl<U> Framed<U>
49where
50 U: Decoder + Encoder,
51{
52 pub async fn flush(&self, full: bool) -> Result<(), io::Error> {
56 self.io.flush(full).await
57 }
58
59 pub async fn shutdown(&self) -> Result<(), io::Error> {
61 self.io.shutdown().await
62 }
63}
64
65impl<U> Framed<U>
66where
67 U: Decoder,
68{
69 #[inline]
70 pub async fn recv(&self) -> Result<Option<U::Item>, Either<U::Error, io::Error>> {
72 self.io.recv(&self.codec).await
73 }
74}
75
76impl<U> Framed<U>
77where
78 U: Encoder,
79{
80 #[inline]
81 pub async fn send(
83 &self,
84 item: <U as Encoder>::Item,
85 ) -> Result<(), Either<U::Error, io::Error>> {
86 self.io.send(item, &self.codec).await
87 }
88}
89
90impl<U> fmt::Debug for Framed<U>
91where
92 U: fmt::Debug,
93{
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 f.debug_struct("Framed")
96 .field("codec", &self.codec)
97 .finish()
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use ntex_bytes::Bytes;
104 use ntex_codec::BytesCodec;
105
106 use super::*;
107 use crate::{Io, testing::IoTest};
108
109 #[ntex::test]
110 async fn framed() {
111 let (client, server) = IoTest::create();
112 client.remote_buffer_cap(1024);
113 client.write(b"chunk-0");
114
115 let server = Framed::new(Io::from(server), BytesCodec);
116 server.get_codec();
117 server.get_io();
118 assert!(format!("{server:?}").contains("Framed"));
119
120 let item = server.recv().await.unwrap().unwrap();
121 assert_eq!(item, b"chunk-0".as_ref());
122
123 let data = Bytes::from_static(b"chunk-1");
124 server.send(data).await.unwrap();
125 server.flush(true).await.unwrap();
126 assert_eq!(client.read_any(), b"chunk-1".as_ref());
127
128 server.shutdown().await.unwrap();
129 assert!(client.is_closed());
130 }
131}