1use std::io;
2use std::fmt;
3use std::mem;
4
5use futures::{Async, Future, Poll};
6use futures::sync::{BiLock, BiLockAcquired, BiLockAcquire};
7use tokio_io::{AsyncRead, AsyncWrite};
8
9use frame;
10use {Buf, Encode, Decode, ReadFramed, WriteFramed};
11
12struct Shared<S> {
13 socket: S,
14 done: bool,
15}
16
17pub struct ReadBuf<S> {
19 pub in_buf: Buf,
20 shared: BiLock<Shared<S>>,
21}
22
23pub struct WriteBuf<S> {
25 pub out_buf: Buf,
26 shared: BiLock<Shared<S>>,
27}
28
29pub struct WriteRaw<S> {
37 io: BiLockAcquired<Shared<S>>,
38}
39
40pub struct FutureWriteRaw<S>(WriteRawFutState<S>);
42
43enum WriteRawFutState<S> {
44 Flushing(WriteBuf<S>),
45 Locking(BiLockAcquire<Shared<S>>),
46 Done,
47}
48
49pub fn create<S>(in_buf: Buf, out_buf: Buf, socket: S, done: bool)
50 -> (WriteBuf<S>, ReadBuf<S>)
51{
52 let (a, b) = BiLock::new(Shared {
53 socket: socket,
54 done: done,
55 });
56 return (
57 WriteBuf {
58 out_buf: in_buf,
59 shared: b,
60 },
61 ReadBuf {
62 in_buf: out_buf,
63 shared: a,
64 });
65}
66
67impl<S> ReadBuf<S> {
68 pub fn read(&mut self) -> Result<usize, io::Error>
84 where S: AsyncRead
85 {
86 if let Async::Ready(ref mut s) = self.shared.poll_lock() {
87 match self.in_buf.read_from(&mut s.socket) {
88 Ok(0) => {
89 s.done = true;
90 Ok(0)
91 }
92 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(0),
93 Err(ref e)
94 if e.kind() == io::ErrorKind::BrokenPipe ||
95 e.kind() == io::ErrorKind::ConnectionReset
96 => {
97 s.done = true;
98 Ok(0)
99 }
100 result => result,
101 }
102 } else {
103 Ok(0)
104 }
105 }
106
107 pub fn done(&self) -> bool {
112 if let Async::Ready(ref mut s) = self.shared.poll_lock() {
113 return s.done;
114 } else {
115 return false;
116 }
117 }
118
119 pub fn framed<D: Decode>(self, codec: D) -> ReadFramed<S, D> {
120 frame::read_framed(self, codec)
121 }
122}
123
124impl<S> WriteBuf<S> {
125 pub fn flush(&mut self) -> Result<(), io::Error>
133 where S: AsyncWrite
134 {
135 if let Async::Ready(ref mut s) = self.shared.poll_lock() {
136 loop {
137 if self.out_buf.len() == 0 {
138 break;
139 }
140 match self.out_buf.write_to(&mut s.socket) {
141 Ok(0) => break,
142 Ok(_) => continue,
143 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
144 break;
145 }
146 Err(ref e)
147 if e.kind() == io::ErrorKind::BrokenPipe ||
148 e.kind() == io::ErrorKind::ConnectionReset
149 => {
150 s.done = true;
151 break;
152 }
153 Err(e) => {
154 return Err(e);
155 },
156 }
157 }
158 match s.socket.flush() {
161 Ok(()) => Ok(()),
162 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
163 Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe ||
164 e.kind() == io::ErrorKind::ConnectionReset
165 => {
166 s.done = true;
167 Ok(())
168 }
169 Err(e) => Err(e),
170 }
171 } else {
172 Ok(())
173 }
174 }
175
176 pub fn done(&self) -> bool {
181 if let Async::Ready(ref mut s) = self.shared.poll_lock() {
182 return s.done;
183 } else {
184 return false;
185 }
186 }
187
188 pub fn borrow_raw(self) -> FutureWriteRaw<S> {
198 if self.out_buf.len() == 0 {
199 FutureWriteRaw(WriteRawFutState::Locking(self.shared.lock()))
200 } else {
201 FutureWriteRaw(WriteRawFutState::Flushing(self))
202 }
203 }
204
205 pub fn framed<E: Encode>(self, codec: E) -> WriteFramed<S, E> {
206 frame::write_framed(self, codec)
207 }
208}
209
210impl<S> WriteRaw<S> {
211 pub fn into_buf(self) -> WriteBuf<S> {
213 WriteBuf {
214 out_buf: Buf::new(),
215 shared: self.io.unlock(),
216 }
217 }
218 pub fn get_ref(&self) -> &S {
219 &self.io.socket
220 }
221 pub fn get_mut(&mut self) -> &mut S {
222 &mut self.io.socket
223 }
224}
225
226impl<S: AsyncWrite> Future for FutureWriteRaw<S> {
227 type Item = WriteRaw<S>;
228 type Error = io::Error;
229 fn poll(&mut self) -> Poll<WriteRaw<S>, io::Error> {
230 use self::WriteRawFutState::*;
231 self.0 = match mem::replace(&mut self.0, Done) {
232 Flushing(mut buf) => {
233 buf.flush()?;
234 if buf.out_buf.len() == 0 {
235 let mut lock = buf.shared.lock();
236 match lock.poll().expect("lock never fails") {
237 Async::Ready(s) => {
238 return Ok(Async::Ready(WriteRaw { io: s }));
239 }
240 Async::NotReady => {}
241 }
242 Locking(lock)
243 } else {
244 Flushing(buf)
245 }
246 }
247 Locking(mut f) => {
248 match f.poll().expect("lock never fails") {
249 Async::Ready(s) => {
250 return Ok(Async::Ready(WriteRaw { io: s }));
251 }
252 Async::NotReady => {}
253 }
254 Locking(f)
255 }
256 Done => panic!("future polled after completion"),
257 };
258 return Ok(Async::NotReady);
259 }
260}
261
262impl<S: AsyncWrite> io::Write for WriteRaw<S> {
263 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
264 self.io.socket.write(buf)
265 }
266 fn flush(&mut self) -> io::Result<()> {
267 self.io.socket.flush()
268 }
269}
270impl<S: AsyncWrite> AsyncWrite for WriteRaw<S> {
271 fn shutdown(&mut self) -> Poll<(), io::Error> {
272 self.io.socket.shutdown()
273 }
274}
275
276impl<S> fmt::Debug for ReadBuf<S> {
277 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
278 write!(f, "ReadBuf {{ in: {}b }}", self.in_buf.len())
279 }
280}
281
282impl<S> fmt::Debug for WriteBuf<S> {
283 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
284 write!(f, "WriteBuf {{ out: {}b }}", self.out_buf.len())
285 }
286}