tk_bufstream/iobuf.rs
1use std::io;
2use std::fmt;
3#[cfg(unix)]
4use std::os::unix::io::{AsRawFd, RawFd};
5
6use tokio_io::{AsyncRead, AsyncWrite};
7
8use flushed::{Flushed, flushed};
9use frame;
10use split;
11use {Buf, Framed, Encode, Decode};
12
13/// A wrapper for full-duplex stream
14pub struct IoBuf<S> {
15 pub in_buf: Buf,
16 pub out_buf: Buf,
17 socket: S,
18 done: bool,
19}
20
21/// Main trait of a stream (meaning socket) with input and output buffers
22///
23/// This is ought to be similar to `tokio_core::Io` but with buffers
24impl<S> IoBuf<S> {
25 /// Create a new IoBuf object with empty buffers
26 pub fn new(sock: S) -> IoBuf<S> {
27 IoBuf {
28 in_buf: Buf::new(),
29 out_buf: Buf::new(),
30 socket: sock,
31 done: false,
32 }
33 }
34 /// Read a chunk of data into a buffer
35 ///
36 /// The data just read can then be found in `self.in_buf`.
37 ///
38 /// This method does just one read. Because you are ought to try parse
39 /// request after every read rather than reading a lot of the data in
40 /// memory.
41 ///
42 /// This method returns `0` when no bytes are read, both when WouldBlock
43 /// occurred and when connection has been closed. You may then use
44 /// `self.done()` to distinguish from these conditions.
45 pub fn read(&mut self) -> Result<usize, io::Error>
46 where S: AsyncRead
47 {
48 match self.in_buf.read_from(&mut self.socket) {
49 Ok(0) => {
50 self.done = true;
51 Ok(0)
52 }
53 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(0),
54 Err(ref e)
55 if e.kind() == io::ErrorKind::BrokenPipe ||
56 e.kind() == io::ErrorKind::ConnectionReset
57 => {
58 self.done = true;
59 Ok(0)
60 }
61 result => result,
62 }
63 }
64
65 /// Write data in the output buffer to actual stream
66 ///
67 /// You should put the data to be sent into `self.out_buf` before flush
68 pub fn flush(&mut self) -> Result<(), io::Error>
69 where S: AsyncWrite
70 {
71 loop {
72 if self.out_buf.len() == 0 {
73 break;
74 }
75 match self.out_buf.write_to(&mut self.socket) {
76 Ok(0) => break,
77 Ok(_) => continue,
78 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
79 break;
80 }
81 Err(ref e)
82 if e.kind() == io::ErrorKind::BrokenPipe ||
83 e.kind() == io::ErrorKind::ConnectionReset
84 => {
85 self.done = true;
86 break;
87 }
88 Err(e) => {
89 return Err(e);
90 },
91 }
92 }
93 // This probably aways does nothing, but we have to support the full
94 // Io protocol
95 match self.socket.flush() {
96 Ok(()) => Ok(()),
97 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
98 Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe ||
99 e.kind() == io::ErrorKind::ConnectionReset
100 => {
101 self.done = true;
102 Ok(())
103 }
104 Err(e) => Err(e),
105 }
106 }
107
108 /// Returns true when connection is closed by peer
109 pub fn done(&self) -> bool {
110 return self.done;
111 }
112
113 /// Returns a future which resolves to this stream when output buffer is
114 /// flushed
115 pub fn flushed(self) -> Flushed<S>
116 where S: AsyncWrite
117 {
118 flushed(self)
119 }
120
121 /// Provides a `Stream` and `Sink` interface for reading and writing to
122 /// this `IoBuf` object, using `Decode` and `Encode` to read and write the
123 /// raw data.
124 ///
125 /// Raw I/O objects work with byte sequences, but higher-level code
126 /// usually wants to batch these into meaningful chunks, called "frames".
127 /// This method layers framing on top of an I/O object, by using the
128 /// `Encode` and `Decode` traits:
129 ///
130 /// - `Encode` interprets frames we want to send into bytes;
131 /// - `Decode` interprets incoming bytes into a stream of frames.
132 ///
133 /// Note that the incoming and outgoing frame types may be distinct.
134 ///
135 /// This function returns a *single* object that is both `Stream` and
136 /// `Sink`; grouping this into a single object is often useful for
137 /// layering things like gzip or TLS, which require both read and write
138 /// access to the underlying object.
139 ///
140 /// If you want to work more directly with the streams and sink, consider
141 /// calling `split` on the `Framed` returned by this method, which will
142 /// break them into separate objects, allowing them to interact more
143 /// easily.
144 pub fn framed<C: Encode + Decode>(self, codec: C) -> Framed<S, C>
145 where Self: Sized,
146 S: AsyncRead + AsyncWrite
147 {
148 frame::framed(self, codec)
149 }
150
151 pub fn split(self) -> (split::WriteBuf<S>, split::ReadBuf<S>)
152 where S: AsyncRead + AsyncWrite
153 {
154 split::create(self.in_buf, self.out_buf, self.socket, self.done)
155 }
156}
157
158impl<S> fmt::Debug for IoBuf<S> {
159 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160 if self.done {
161 write!(f, "IoBuf {{ [done], in: {}b, out: {}b }}",
162 self.in_buf.len(), self.out_buf.len())
163 } else {
164 write!(f, "IoBuf {{ in: {}b, out: {}b }}",
165 self.in_buf.len(), self.out_buf.len())
166 }
167 }
168}
169
170#[cfg(unix)]
171impl<S: AsRawFd> AsRawFd for IoBuf<S> {
172 fn as_raw_fd(&self) -> RawFd {
173 self.socket.as_raw_fd()
174 }
175}
176
177impl<S: AsyncWrite> io::Write for IoBuf<S> {
178 fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
179 // TODO(tailhook) may try to write to the buf directly if
180 // output buffer is empty
181 self.out_buf.write(buf)?;
182 self.flush()?;
183 Ok(buf.len())
184 }
185 fn flush(&mut self) -> Result<(), io::Error> {
186 self.flush()
187 }
188}