tk_bufstream/
iobuf.rs

1use std::io;
2use std::fmt;
3#[cfg(unix)]
4use std::os::unix::io::{AsRawFd, RawFd};
5
6use tokio_io::{AsyncRead, AsyncWrite};
7
8use flushed::{Flushed, flushed};
9use frame;
10use split;
11use {Buf, Framed, Encode, Decode};
12
13/// A wrapper for full-duplex stream
14pub struct IoBuf<S> {
15    pub in_buf: Buf,
16    pub out_buf: Buf,
17    socket: S,
18    done: bool,
19}
20
21/// Main trait of a stream (meaning socket) with input and output buffers
22///
23/// This is ought to be similar to `tokio_core::Io` but with buffers
24impl<S> IoBuf<S> {
25    /// Create a new IoBuf object with empty buffers
26    pub fn new(sock: S) -> IoBuf<S> {
27        IoBuf {
28            in_buf: Buf::new(),
29            out_buf: Buf::new(),
30            socket: sock,
31            done: false,
32        }
33    }
34    /// Read a chunk of data into a buffer
35    ///
36    /// The data just read can then be found in `self.in_buf`.
37    ///
38    /// This method does just one read. Because you are ought to try parse
39    /// request after every read rather than reading a lot of the data in
40    /// memory.
41    ///
42    /// This method returns `0` when no bytes are read, both when WouldBlock
43    /// occurred and when connection has been closed. You may then use
44    /// `self.done()` to distinguish from these conditions.
45    pub fn read(&mut self) -> Result<usize, io::Error>
46        where S: AsyncRead
47    {
48        match self.in_buf.read_from(&mut self.socket) {
49            Ok(0) => {
50                self.done = true;
51                Ok(0)
52            }
53            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(0),
54            Err(ref e)
55                if e.kind() == io::ErrorKind::BrokenPipe ||
56                   e.kind() == io::ErrorKind::ConnectionReset
57            => {
58                self.done = true;
59                Ok(0)
60            }
61            result => result,
62        }
63    }
64
65    /// Write data in the output buffer to actual stream
66    ///
67    /// You should put the data to be sent into `self.out_buf` before flush
68    pub fn flush(&mut self) -> Result<(), io::Error>
69        where S: AsyncWrite
70    {
71        loop {
72            if self.out_buf.len() == 0 {
73                break;
74            }
75            match self.out_buf.write_to(&mut self.socket) {
76                Ok(0) => break,
77                Ok(_) => continue,
78                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
79                    break;
80                }
81                Err(ref e)
82                    if e.kind() == io::ErrorKind::BrokenPipe ||
83                       e.kind() == io::ErrorKind::ConnectionReset
84                => {
85                    self.done = true;
86                    break;
87                }
88                Err(e) => {
89                    return Err(e);
90                },
91            }
92        }
93        // This probably aways does nothing, but we have to support the full
94        // Io protocol
95        match self.socket.flush() {
96            Ok(()) => Ok(()),
97            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
98            Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe ||
99                          e.kind() == io::ErrorKind::ConnectionReset
100            => {
101                self.done = true;
102                Ok(())
103            }
104            Err(e) => Err(e),
105        }
106    }
107
108    /// Returns true when connection is closed by peer
109    pub fn done(&self) -> bool {
110        return self.done;
111    }
112
113    /// Returns a future which resolves to this stream when output buffer is
114    /// flushed
115    pub fn flushed(self) -> Flushed<S>
116        where S: AsyncWrite
117    {
118        flushed(self)
119    }
120
121    /// Provides a `Stream` and `Sink` interface for reading and writing to
122    /// this `IoBuf` object, using `Decode` and `Encode` to read and write the
123    /// raw data.
124    ///
125    /// Raw I/O objects work with byte sequences, but higher-level code
126    /// usually wants to batch these into meaningful chunks, called "frames".
127    /// This method layers framing on top of an I/O object, by using the
128    /// `Encode` and `Decode` traits:
129    ///
130    /// - `Encode` interprets frames we want to send into bytes;
131    /// - `Decode` interprets incoming bytes into a stream of frames.
132    ///
133    /// Note that the incoming and outgoing frame types may be distinct.
134    ///
135    /// This function returns a *single* object that is both `Stream` and
136    /// `Sink`; grouping this into a single object is often useful for
137    /// layering things like gzip or TLS, which require both read and write
138    /// access to the underlying object.
139    ///
140    /// If you want to work more directly with the streams and sink, consider
141    /// calling `split` on the `Framed` returned by this method, which will
142    /// break them into separate objects, allowing them to interact more
143    /// easily.
144    pub fn framed<C: Encode + Decode>(self, codec: C) -> Framed<S, C>
145        where Self: Sized,
146              S: AsyncRead + AsyncWrite
147    {
148        frame::framed(self, codec)
149    }
150
151    pub fn split(self) -> (split::WriteBuf<S>, split::ReadBuf<S>)
152        where S: AsyncRead + AsyncWrite
153    {
154        split::create(self.in_buf, self.out_buf, self.socket, self.done)
155    }
156}
157
158impl<S> fmt::Debug for IoBuf<S> {
159    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160        if self.done {
161            write!(f, "IoBuf {{ [done], in: {}b, out: {}b }}",
162                self.in_buf.len(), self.out_buf.len())
163        } else {
164            write!(f, "IoBuf {{ in: {}b, out: {}b }}",
165                self.in_buf.len(), self.out_buf.len())
166        }
167    }
168}
169
170#[cfg(unix)]
171impl<S: AsRawFd> AsRawFd for IoBuf<S> {
172    fn as_raw_fd(&self) -> RawFd {
173        self.socket.as_raw_fd()
174    }
175}
176
177impl<S: AsyncWrite> io::Write for IoBuf<S> {
178    fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
179        // TODO(tailhook) may try to write to the buf directly if
180        // output buffer is empty
181        self.out_buf.write(buf)?;
182        self.flush()?;
183        Ok(buf.len())
184    }
185    fn flush(&mut self) -> Result<(), io::Error> {
186        self.flush()
187    }
188}