tk_bufstream/
split.rs

1use std::io;
2use std::fmt;
3use std::mem;
4
5use futures::{Async, Future, Poll};
6use futures::sync::{BiLock, BiLockAcquired, BiLockAcquire};
7use tokio_io::{AsyncRead, AsyncWrite};
8
9use frame;
10use {Buf, Encode, Decode, ReadFramed, WriteFramed};
11
12struct Shared<S> {
13    socket: S,
14    done: bool,
15}
16
17/// An input counterpart of IoBuf when the latter is split
18pub struct ReadBuf<S> {
19    pub in_buf: Buf,
20    shared: BiLock<Shared<S>>,
21}
22
23/// An output counterpart of IoBuf when the latter is split
24pub struct WriteBuf<S> {
25    pub out_buf: Buf,
26    shared: BiLock<Shared<S>>,
27}
28
29/// A structure that locks IoBuf and allows you to write to the socket directly
30///
31/// Where "directly" means without buffering and presumably with some zero-copy
32/// method like `sendfile()` or `splice()`
33///
34/// Note: when `WriteRaw` is alive `ReadBuf` is alive, but locked and will
35/// wake up as quick as `WriteRaw` is converted back to `WriteBuf`.
36pub struct WriteRaw<S> {
37    io: BiLockAcquired<Shared<S>>,
38}
39
40/// A future which converts `WriteBuf` into `WriteRaw`
41pub struct FutureWriteRaw<S>(WriteRawFutState<S>);
42
43enum WriteRawFutState<S> {
44    Flushing(WriteBuf<S>),
45    Locking(BiLockAcquire<Shared<S>>),
46    Done,
47}
48
49pub fn create<S>(in_buf: Buf, out_buf: Buf, socket: S, done: bool)
50    -> (WriteBuf<S>, ReadBuf<S>)
51{
52    let (a, b) = BiLock::new(Shared {
53        socket: socket,
54        done: done,
55    });
56    return (
57        WriteBuf {
58            out_buf: in_buf,
59            shared: b,
60        },
61        ReadBuf {
62            in_buf: out_buf,
63            shared: a,
64        });
65}
66
67impl<S> ReadBuf<S> {
68    /// Read a chunk of data into a buffer
69    ///
70    /// The data just read can then be found in `self.in_buf`.
71    ///
72    /// This method does just one read. Because you are ought to try parse
73    /// request after every read rather than reading a lot of the data in
74    /// memory.
75    ///
76    /// This method returns `0` when no bytes are read, both when WouldBlock
77    /// occurred and when connection has been closed. You may then use
78    /// `self.done()` to distinguish from these conditions.
79    ///
80    /// Note: this method silently assumes that you will call it on poll
81    /// every time until self.done() returns false. I.e. it behaves as Async
82    /// method but does't return Async value to allow simpler handling
83    pub fn read(&mut self) -> Result<usize, io::Error>
84        where S: AsyncRead
85    {
86        if let Async::Ready(ref mut s) = self.shared.poll_lock() {
87            match self.in_buf.read_from(&mut s.socket) {
88                Ok(0) => {
89                    s.done = true;
90                    Ok(0)
91                }
92                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(0),
93                Err(ref e)
94                    if e.kind() == io::ErrorKind::BrokenPipe ||
95                       e.kind() == io::ErrorKind::ConnectionReset
96                => {
97                    s.done = true;
98                    Ok(0)
99                }
100                result => result,
101            }
102        } else {
103            Ok(0)
104        }
105    }
106
107    /// Returns true when connection is closed by peer
108    ///
109    /// Note: this method returns false and schedules a wakeup if connecion
110    /// is currently locked
111    pub fn done(&self) -> bool {
112        if let Async::Ready(ref mut s) = self.shared.poll_lock() {
113            return s.done;
114        } else {
115            return false;
116        }
117    }
118
119    pub fn framed<D: Decode>(self, codec: D) -> ReadFramed<S, D> {
120        frame::read_framed(self, codec)
121    }
122}
123
124impl<S> WriteBuf<S> {
125    /// Write data in the output buffer to actual stream
126    ///
127    /// You should put the data to be sent into `self.out_buf` before flush
128    ///
129    /// Note: this method silently assumes that you will call it on poll
130    /// every time until self.done() returns false. I.e. it behaves as Async
131    /// method but does't return Async value to allow simpler handling
132    pub fn flush(&mut self) -> Result<(), io::Error>
133        where S: AsyncWrite
134    {
135        if let Async::Ready(ref mut s) = self.shared.poll_lock() {
136            loop {
137                if self.out_buf.len() == 0 {
138                    break;
139                }
140                match self.out_buf.write_to(&mut s.socket) {
141                    Ok(0) => break,
142                    Ok(_) => continue,
143                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
144                        break;
145                    }
146                    Err(ref e)
147                        if e.kind() == io::ErrorKind::BrokenPipe ||
148                           e.kind() == io::ErrorKind::ConnectionReset
149                    => {
150                        s.done = true;
151                        break;
152                    }
153                    Err(e) => {
154                        return Err(e);
155                    },
156                }
157            }
158            // This probably always does nothing, but we have to support the
159            // full Io protocol
160            match s.socket.flush() {
161                Ok(()) => Ok(()),
162                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
163                Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe ||
164                              e.kind() == io::ErrorKind::ConnectionReset
165                => {
166                    s.done = true;
167                    Ok(())
168                }
169                Err(e) => Err(e),
170            }
171        } else {
172            Ok(())
173        }
174    }
175
176    /// Returns true when connection is closed by peer
177    ///
178    /// Note: this method returns false and schedules a wakeup if connecion
179    /// is currently locked
180    pub fn done(&self) -> bool {
181        if let Async::Ready(ref mut s) = self.shared.poll_lock() {
182            return s.done;
183        } else {
184            return false;
185        }
186    }
187
188    /// Returns a future which will resolve into WriteRaw
189    ///
190    /// This future resolves when after two conditions:
191    ///
192    /// 1. Output buffer is fully flushed to the network (i.e. OS buffers)
193    /// 2. Internal BiLock is locked
194    ///
195    /// Note: `WriteRaw` will lock the underlying stream for the whole
196    /// lifetime of the `WriteRaw`.
197    pub fn borrow_raw(self) -> FutureWriteRaw<S> {
198        if self.out_buf.len() == 0 {
199            FutureWriteRaw(WriteRawFutState::Locking(self.shared.lock()))
200        } else {
201            FutureWriteRaw(WriteRawFutState::Flushing(self))
202        }
203    }
204
205    pub fn framed<E: Encode>(self, codec: E) -> WriteFramed<S, E> {
206        frame::write_framed(self, codec)
207    }
208}
209
210impl<S> WriteRaw<S> {
211    /// Turn raw writer back into buffered and release internal BiLock
212    pub fn into_buf(self) -> WriteBuf<S> {
213        WriteBuf {
214            out_buf: Buf::new(),
215            shared: self.io.unlock(),
216        }
217    }
218    pub fn get_ref(&self) -> &S {
219        &self.io.socket
220    }
221    pub fn get_mut(&mut self) -> &mut S {
222        &mut self.io.socket
223    }
224}
225
226impl<S: AsyncWrite> Future for FutureWriteRaw<S> {
227    type Item = WriteRaw<S>;
228    type Error = io::Error;
229    fn poll(&mut self) -> Poll<WriteRaw<S>, io::Error> {
230        use self::WriteRawFutState::*;
231        self.0 = match mem::replace(&mut self.0, Done) {
232            Flushing(mut buf) => {
233                buf.flush()?;
234                if buf.out_buf.len() == 0 {
235                    let mut lock = buf.shared.lock();
236                    match lock.poll().expect("lock never fails") {
237                        Async::Ready(s) => {
238                            return Ok(Async::Ready(WriteRaw { io: s }));
239                        }
240                        Async::NotReady => {}
241                    }
242                    Locking(lock)
243                } else {
244                    Flushing(buf)
245                }
246            }
247            Locking(mut f) => {
248                match f.poll().expect("lock never fails") {
249                    Async::Ready(s) => {
250                        return Ok(Async::Ready(WriteRaw { io: s }));
251                    }
252                    Async::NotReady => {}
253                }
254                Locking(f)
255            }
256            Done => panic!("future polled after completion"),
257        };
258        return Ok(Async::NotReady);
259    }
260}
261
262impl<S: AsyncWrite> io::Write for WriteRaw<S> {
263    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
264        self.io.socket.write(buf)
265    }
266    fn flush(&mut self) -> io::Result<()> {
267        self.io.socket.flush()
268    }
269}
270impl<S: AsyncWrite> AsyncWrite for WriteRaw<S> {
271    fn shutdown(&mut self) -> Poll<(), io::Error> {
272        self.io.socket.shutdown()
273    }
274}
275
276impl<S> fmt::Debug for ReadBuf<S> {
277    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
278        write!(f, "ReadBuf {{ in: {}b }}", self.in_buf.len())
279    }
280}
281
282impl<S> fmt::Debug for WriteBuf<S> {
283    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
284        write!(f, "WriteBuf {{ out: {}b }}", self.out_buf.len())
285    }
286}