1use std::{fmt, io};
2
3use ntex_codec::{Decoder, Encoder};
4use ntex_util::future::Either;
5
6use crate::IoBoxed;
7
8pub struct Framed<U> {
12 io: IoBoxed,
13 codec: U,
14}
15
16impl<U> Framed<U> {
17 #[inline]
18 pub fn new<Io>(io: Io, codec: U) -> Framed<U>
21 where
22 IoBoxed: From<Io>,
23 {
24 Framed {
25 codec,
26 io: IoBoxed::from(io),
27 }
28 }
29
30 #[inline]
31 pub fn get_io(&self) -> &IoBoxed {
33 &self.io
34 }
35
36 #[inline]
37 pub fn get_codec(&self) -> &U {
39 &self.codec
40 }
41
42 #[inline]
43 pub fn into_inner(self) -> (IoBoxed, U) {
45 (self.io, self.codec)
46 }
47}
48
49impl<U> Framed<U>
50where
51 U: Decoder + Encoder,
52{
53 #[inline]
54 pub async fn flush(&self, full: bool) -> Result<(), io::Error> {
58 self.io.flush(full).await
59 }
60
61 #[inline]
62 pub async fn shutdown(&self) -> Result<(), io::Error> {
64 self.io.shutdown().await
65 }
66}
67
68impl<U> Framed<U>
69where
70 U: Decoder,
71{
72 #[inline]
73 pub async fn recv(&self) -> Result<Option<U::Item>, Either<U::Error, io::Error>> {
75 self.io.recv(&self.codec).await
76 }
77}
78
79impl<U> Framed<U>
80where
81 U: Encoder,
82{
83 #[inline]
84 pub async fn send(
86 &self,
87 item: <U as Encoder>::Item,
88 ) -> Result<(), Either<U::Error, io::Error>> {
89 self.io.send(item, &self.codec).await
90 }
91}
92
93impl<U> fmt::Debug for Framed<U>
94where
95 U: fmt::Debug,
96{
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 f.debug_struct("Framed")
99 .field("codec", &self.codec)
100 .finish()
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use ntex_bytes::Bytes;
107 use ntex_codec::BytesCodec;
108
109 use super::*;
110 use crate::{testing::IoTest, Io};
111
112 #[ntex::test]
113 async fn framed() {
114 let (client, server) = IoTest::create();
115 client.remote_buffer_cap(1024);
116 client.write(b"chunk-0");
117
118 let server = Framed::new(Io::new(server), BytesCodec);
119 server.get_codec();
120 server.get_io();
121 assert!(format!("{server:?}").contains("Framed"));
122
123 let item = server.recv().await.unwrap().unwrap();
124 assert_eq!(item, b"chunk-0".as_ref());
125
126 let data = Bytes::from_static(b"chunk-1");
127 server.send(data).await.unwrap();
128 server.flush(true).await.unwrap();
129 assert_eq!(client.read_any(), b"chunk-1".as_ref());
130
131 server.shutdown().await.unwrap();
132 assert!(client.is_closed());
133 }
134}