1use std::io::{Error, ErrorKind, Read, Write};
9use std::marker::PhantomData;
10use std::mem;
11use std::os::unix::io::{AsRawFd, RawFd};
12
13use crate::frame::{Frame, FrameBuilder};
17
18use super::{Blocking, NonBlocking};
19
20const BUF_SIZE: usize = 1024;
21
22#[derive(Clone)]
24pub struct Plain<S, FB>
25where
26 S: Read + Write,
27 FB: FrameBuilder,
28{
29 inner: S,
30 rx_buf: Vec<u8>,
31 tx_buf: Vec<u8>,
32 phantom: PhantomData<FB>,
33}
34
35impl<S, FB> Plain<S, FB>
36where
37 S: Read + Write,
38 FB: FrameBuilder,
39{
40 pub fn new(stream: S) -> Plain<S, FB> {
42 Plain {
43 inner: stream,
44 rx_buf: Vec::<u8>::with_capacity(BUF_SIZE),
45 tx_buf: Vec::<u8>::with_capacity(BUF_SIZE),
46 phantom: PhantomData,
47 }
48 }
49}
50
51impl<S, FB> Blocking for Plain<S, FB>
52where
53 S: Read + Write,
54 FB: FrameBuilder,
55{
56 fn b_recv(&mut self) -> Result<Box<dyn Frame>, Error> {
57 match FB::from_bytes(&mut self.rx_buf) {
59 Some(boxed_frame) => {
60 debug!("Complete frame read");
61 return Ok(boxed_frame);
62 }
63 None => {}
64 };
65
66 loop {
67 let mut buf = [0u8; BUF_SIZE];
68 let read_result = self.inner.read(&mut buf);
69 if read_result.is_err() {
70 let err = read_result.unwrap_err();
71 return Err(err);
72 }
73
74 let num_read = read_result.unwrap();
75 trace!("Read {} byte(s)", num_read);
76 self.rx_buf.extend_from_slice(&buf[0..num_read]);
77
78 match FB::from_bytes(&mut self.rx_buf) {
79 Some(boxed_frame) => {
80 debug!("Complete frame read");
81 return Ok(boxed_frame);
82 }
83 None => {}
84 };
85 }
86 }
87
88 fn b_send(&mut self, frame: &dyn Frame) -> Result<(), Error> {
89 let out_buf = frame.to_bytes();
90 let write_result = self.inner.write(&out_buf[..]);
91 if write_result.is_err() {
92 let err = write_result.unwrap_err();
93 return Err(err);
94 }
95
96 trace!("Wrote {} byte(s)", write_result.unwrap());
97
98 Ok(())
99 }
100}
101
102impl<S, FB> NonBlocking for Plain<S, FB>
103where
104 S: Read + Write,
105 FB: FrameBuilder,
106{
107 fn nb_recv(&mut self) -> Result<Vec<Box<dyn Frame>>, Error> {
108 loop {
109 let mut buf = [0u8; BUF_SIZE];
110 let read_result = self.inner.read(&mut buf);
111 if read_result.is_err() {
112 let err = read_result.unwrap_err();
113 if err.kind() == ErrorKind::WouldBlock {
114 break;
115 }
116 return Err(err);
117 }
118
119 let num_read = read_result.unwrap();
120 trace!("Read {} byte(s)", num_read);
121 self.rx_buf.extend_from_slice(&buf[0..num_read]);
122 }
123
124 let mut ret_buf = Vec::<Box<dyn Frame>>::with_capacity(5);
125 while let Some(boxed_frame) = FB::from_bytes(&mut self.rx_buf) {
126 debug!("Complete frame read");
127 ret_buf.push(boxed_frame);
128 }
129
130 if ret_buf.len() > 0 {
131 debug!("Read {} frame(s)", ret_buf.len());
132 return Ok(ret_buf);
133 }
134
135 Err(Error::new(ErrorKind::WouldBlock, "WouldBlock"))
136 }
137
138 fn nb_send(&mut self, frame: &dyn Frame) -> Result<(), Error> {
139 self.tx_buf.extend_from_slice(&frame.to_bytes()[..]);
140
141 let mut out_buf = Vec::<u8>::with_capacity(BUF_SIZE);
142 mem::swap(&mut self.tx_buf, &mut out_buf);
143
144 let write_result = self.inner.write(&out_buf[..]);
145 if write_result.is_err() {
146 let err = write_result.unwrap_err();
147 return Err(err);
148 }
149
150 let num_written = write_result.unwrap();
151 if num_written == 0 {
152 return Err(Error::new(ErrorKind::Other, "Write returned zero"));
153 }
154
155 trace!(
156 "Tried to write {} byte(s) wrote {} byte(s)",
157 out_buf.len(),
158 num_written
159 );
160
161 if num_written < out_buf.len() {
162 let out_buf_len = out_buf.len();
163 self.tx_buf
164 .extend_from_slice(&out_buf[num_written..out_buf_len]);
165
166 return Err(Error::new(ErrorKind::WouldBlock, "WouldBlock"));
167 }
168
169 Ok(())
170 }
171}
172
173impl<S, FB> AsRawFd for Plain<S, FB>
174where
175 S: Read + Write + AsRawFd,
176 FB: FrameBuilder,
177{
178 fn as_raw_fd(&self) -> RawFd {
179 self.inner.as_raw_fd()
180 }
181}