simple_stream/
plain.rs

1// Copyright 2015 Nathan Sizemore <nathanrsizemore@gmail.com>
2//
3// This Source Code Form is subject to the terms of the
4// Mozilla Public License, v. 2.0. If a copy of the MPL was not
5// distributed with this file, You can obtain one at
6// http://mozilla.org/MPL/2.0/.
7
8use std::io::{Error, ErrorKind, Read, Write};
9use std::marker::PhantomData;
10use std::mem;
11use std::os::unix::io::{AsRawFd, RawFd};
12
13// use libc;
14// use errno::errno;
15
16use crate::frame::{Frame, FrameBuilder};
17
18use super::{Blocking, NonBlocking};
19
20const BUF_SIZE: usize = 1024;
21
22/// Plain text stream.
23#[derive(Clone)]
24pub struct Plain<S, FB>
25where
26    S: Read + Write,
27    FB: FrameBuilder,
28{
29    inner: S,
30    rx_buf: Vec<u8>,
31    tx_buf: Vec<u8>,
32    phantom: PhantomData<FB>,
33}
34
35impl<S, FB> Plain<S, FB>
36where
37    S: Read + Write,
38    FB: FrameBuilder,
39{
40    /// Creates a new plain text stream.
41    pub fn new(stream: S) -> Plain<S, FB> {
42        Plain {
43            inner: stream,
44            rx_buf: Vec::<u8>::with_capacity(BUF_SIZE),
45            tx_buf: Vec::<u8>::with_capacity(BUF_SIZE),
46            phantom: PhantomData,
47        }
48    }
49}
50
51impl<S, FB> Blocking for Plain<S, FB>
52where
53    S: Read + Write,
54    FB: FrameBuilder,
55{
56    fn b_recv(&mut self) -> Result<Box<dyn Frame>, Error> {
57        // Empty anything that is in our buffer already from any previous reads
58        match FB::from_bytes(&mut self.rx_buf) {
59            Some(boxed_frame) => {
60                debug!("Complete frame read");
61                return Ok(boxed_frame);
62            }
63            None => {}
64        };
65
66        loop {
67            let mut buf = [0u8; BUF_SIZE];
68            let read_result = self.inner.read(&mut buf);
69            if read_result.is_err() {
70                let err = read_result.unwrap_err();
71                return Err(err);
72            }
73
74            let num_read = read_result.unwrap();
75            trace!("Read {} byte(s)", num_read);
76            self.rx_buf.extend_from_slice(&buf[0..num_read]);
77
78            match FB::from_bytes(&mut self.rx_buf) {
79                Some(boxed_frame) => {
80                    debug!("Complete frame read");
81                    return Ok(boxed_frame);
82                }
83                None => {}
84            };
85        }
86    }
87
88    fn b_send(&mut self, frame: &dyn Frame) -> Result<(), Error> {
89        let out_buf = frame.to_bytes();
90        let write_result = self.inner.write(&out_buf[..]);
91        if write_result.is_err() {
92            let err = write_result.unwrap_err();
93            return Err(err);
94        }
95
96        trace!("Wrote {} byte(s)", write_result.unwrap());
97
98        Ok(())
99    }
100}
101
102impl<S, FB> NonBlocking for Plain<S, FB>
103where
104    S: Read + Write,
105    FB: FrameBuilder,
106{
107    fn nb_recv(&mut self) -> Result<Vec<Box<dyn Frame>>, Error> {
108        loop {
109            let mut buf = [0u8; BUF_SIZE];
110            let read_result = self.inner.read(&mut buf);
111            if read_result.is_err() {
112                let err = read_result.unwrap_err();
113                if err.kind() == ErrorKind::WouldBlock {
114                    break;
115                }
116                return Err(err);
117            }
118
119            let num_read = read_result.unwrap();
120            trace!("Read {} byte(s)", num_read);
121            self.rx_buf.extend_from_slice(&buf[0..num_read]);
122        }
123
124        let mut ret_buf = Vec::<Box<dyn Frame>>::with_capacity(5);
125        while let Some(boxed_frame) = FB::from_bytes(&mut self.rx_buf) {
126            debug!("Complete frame read");
127            ret_buf.push(boxed_frame);
128        }
129
130        if ret_buf.len() > 0 {
131            debug!("Read {} frame(s)", ret_buf.len());
132            return Ok(ret_buf);
133        }
134
135        Err(Error::new(ErrorKind::WouldBlock, "WouldBlock"))
136    }
137
138    fn nb_send(&mut self, frame: &dyn Frame) -> Result<(), Error> {
139        self.tx_buf.extend_from_slice(&frame.to_bytes()[..]);
140
141        let mut out_buf = Vec::<u8>::with_capacity(BUF_SIZE);
142        mem::swap(&mut self.tx_buf, &mut out_buf);
143
144        let write_result = self.inner.write(&out_buf[..]);
145        if write_result.is_err() {
146            let err = write_result.unwrap_err();
147            return Err(err);
148        }
149
150        let num_written = write_result.unwrap();
151        if num_written == 0 {
152            return Err(Error::new(ErrorKind::Other, "Write returned zero"));
153        }
154
155        trace!(
156            "Tried to write {} byte(s) wrote {} byte(s)",
157            out_buf.len(),
158            num_written
159        );
160
161        if num_written < out_buf.len() {
162            let out_buf_len = out_buf.len();
163            self.tx_buf
164                .extend_from_slice(&out_buf[num_written..out_buf_len]);
165
166            return Err(Error::new(ErrorKind::WouldBlock, "WouldBlock"));
167        }
168
169        Ok(())
170    }
171}
172
173impl<S, FB> AsRawFd for Plain<S, FB>
174where
175    S: Read + Write + AsRawFd,
176    FB: FrameBuilder,
177{
178    fn as_raw_fd(&self) -> RawFd {
179        self.inner.as_raw_fd()
180    }
181}