1use std::fmt;
2use std::io::{self, Read, Write, Cursor, ErrorKind};
3use std::mem;
4use std::net::{self, SocketAddr};
5use std::os::windows::prelude::*;
6use std::sync::{Mutex, MutexGuard};
7
8use net2::{self, TcpBuilder};
9use net::tcp::Shutdown;
10use miow::iocp::CompletionStatus;
11use miow::net::*;
12use winapi::*;
13
14use {Evented, EventSet, PollOpt, Selector, Token};
15use event::IoEvent;
16use sys::windows::selector::{Overlapped, Registration};
17use sys::windows::{wouldblock, Family};
18use sys::windows::from_raw_arc::FromRawArc;
19
20pub struct TcpStream {
21 imp: StreamImp,
25}
26
27pub struct TcpListener {
28 imp: ListenerImp,
29}
30
31#[derive(Clone)]
32struct StreamImp {
33 inner: FromRawArc<StreamIo>,
45}
46
47#[derive(Clone)]
48struct ListenerImp {
49 inner: FromRawArc<ListenerIo>,
50}
51
52struct StreamIo {
53 inner: Mutex<StreamInner>,
54 read: Overlapped, write: Overlapped,
56}
57
58struct ListenerIo {
59 inner: Mutex<ListenerInner>,
60 accept: Overlapped,
61}
62
63struct StreamInner {
64 socket: net::TcpStream,
65 iocp: Registration,
66 deferred_connect: Option<SocketAddr>,
67 read: State<Vec<u8>, Cursor<Vec<u8>>>,
68 write: State<(Vec<u8>, usize), (Vec<u8>, usize)>,
69}
70
71struct ListenerInner {
72 socket: net::TcpListener,
73 family: Family,
74 iocp: Registration,
75 accept: State<net::TcpStream, (net::TcpStream, SocketAddr)>,
76 accept_buf: AcceptAddrsBuf,
77}
78
79enum State<T, U> {
80 Empty, Pending(T), Ready(U), Error(io::Error), }
85
86impl TcpStream {
87 fn new(socket: net::TcpStream,
88 deferred_connect: Option<SocketAddr>) -> TcpStream {
89 TcpStream {
90 imp: StreamImp {
91 inner: FromRawArc::new(StreamIo {
92 read: Overlapped::new(read_done),
93 write: Overlapped::new(write_done),
94 inner: Mutex::new(StreamInner {
95 socket: socket,
96 iocp: Registration::new(),
97 deferred_connect: deferred_connect,
98 read: State::Empty,
99 write: State::Empty,
100 }),
101 }),
102 },
103 }
104 }
105
106 pub fn connect(socket: net::TcpStream, addr: &SocketAddr)
107 -> io::Result<TcpStream> {
108 Ok(TcpStream::new(socket, Some(*addr)))
109 }
110
111 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
112 self.inner().socket.peer_addr()
113 }
114
115 pub fn local_addr(&self) -> io::Result<SocketAddr> {
116 self.inner().socket.local_addr()
117 }
118
119 pub fn try_clone(&self) -> io::Result<TcpStream> {
120 self.inner().socket.try_clone().map(|s| TcpStream::new(s, None))
121 }
122
123 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
124 self.inner().socket.shutdown(how)
125 }
126
127 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
128 net2::TcpStreamExt::set_nodelay(&self.inner().socket, nodelay)
129 }
130
131 pub fn set_keepalive(&self, seconds: Option<u32>) -> io::Result<()> {
132 let dur = seconds.map(|s| s * 1000);
133 net2::TcpStreamExt::set_keepalive_ms(&self.inner().socket, dur)
134 }
135
136 pub fn take_socket_error(&self) -> io::Result<()> {
137 net2::TcpStreamExt::take_error(&self.inner().socket).and_then(|e| {
138 match e {
139 Some(e) => Err(e),
140 None => Ok(())
141 }
142 })
143 }
144
145 fn inner(&self) -> MutexGuard<StreamInner> {
146 self.imp.inner()
147 }
148
149 fn post_register(&self, interest: EventSet, me: &mut StreamInner) {
150 if interest.is_readable() {
151 self.imp.schedule_read(me);
152 }
153
154 if interest.is_writable() {
158 if let State::Empty = me.write {
159 me.iocp.defer(EventSet::writable());
160 }
161 }
162 }
163}
164
165impl StreamImp {
166 fn inner(&self) -> MutexGuard<StreamInner> {
167 self.inner.inner.lock().unwrap()
168 }
169
170 fn schedule_connect(&self, addr: &SocketAddr, me: &mut StreamInner)
171 -> io::Result<()> {
172 unsafe {
173 trace!("scheduling a connect");
174 try!(me.socket.connect_overlapped(addr, self.inner.read.get_mut()));
175 }
176 mem::forget(self.clone());
178 Ok(())
179 }
180
181 fn schedule_read(&self, me: &mut StreamInner) {
190 match me.read {
191 State::Empty => {}
192 _ => return,
193 }
194
195 me.iocp.unset_readiness(EventSet::readable());
196
197 let mut buf = me.iocp.get_buffer(64 * 1024);
198 let res = unsafe {
199 trace!("scheduling a read");
200 let cap = buf.capacity();
201 buf.set_len(cap);
202 me.socket.read_overlapped(&mut buf, self.inner.read.get_mut())
203 };
204 match res {
205 Ok(_) => {
206 me.read = State::Pending(buf);
208 mem::forget(self.clone());
209 }
210 Err(e) => {
211 let mut set = EventSet::readable();
214 if e.raw_os_error() == Some(WSAECONNRESET as i32) {
215 set = set | EventSet::hup();
216 }
217 me.read = State::Error(e);
218 me.iocp.defer(set);
219 me.iocp.put_buffer(buf);
220 }
221 }
222 }
223
224 fn schedule_write(&self, buf: Vec<u8>, pos: usize,
234 me: &mut StreamInner) {
235
236 me.iocp.unset_readiness(EventSet::writable());
238
239 trace!("scheduling a write");
240 let err = unsafe {
241 me.socket.write_overlapped(&buf[pos..], self.inner.write.get_mut())
242 };
243 match err {
244 Ok(_) => {
245 me.write = State::Pending((buf, pos));
247 mem::forget(self.clone());
248 }
249 Err(e) => {
250 me.write = State::Error(e);
251 me.iocp.defer(EventSet::writable());
252 me.iocp.put_buffer(buf);
253 }
254 }
255 }
256
257 fn push(&self, me: &mut StreamInner, set: EventSet,
263 into: &mut Vec<IoEvent>) {
264 if me.socket.as_raw_socket() != INVALID_SOCKET {
265 me.iocp.push_event(set, into);
266 }
267 }
268}
269
270fn read_done(status: &CompletionStatus, dst: &mut Vec<IoEvent>) {
271 let me2 = StreamImp {
272 inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) },
273 };
274
275 let mut me = me2.inner();
276 match mem::replace(&mut me.read, State::Empty) {
277 State::Pending(mut buf) => {
278 trace!("finished a read: {}", status.bytes_transferred());
279
280 unsafe {
281 buf.set_len(status.bytes_transferred() as usize);
282 }
283
284 me.read = State::Ready(Cursor::new(buf));
285
286 let mut e = EventSet::readable();
289
290 if status.bytes_transferred() == 0 {
291 e = e | EventSet::hup();
292 }
293
294 return me2.push(&mut me, e, dst)
295 }
296 s => me.read = s,
297 }
298
299 trace!("finished a connect");
301 me2.push(&mut me, EventSet::writable(), dst);
302 me2.schedule_read(&mut me);
303}
304
305fn write_done(status: &CompletionStatus, dst: &mut Vec<IoEvent>) {
306 trace!("finished a write {}", status.bytes_transferred());
307 let me2 = StreamImp {
308 inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) },
309 };
310 let mut me = me2.inner();
311 let (buf, pos) = match mem::replace(&mut me.write, State::Empty) {
312 State::Pending(pair) => pair,
313 _ => unreachable!(),
314 };
315 let new_pos = pos + (status.bytes_transferred() as usize);
316 if new_pos == buf.len() {
317 me2.push(&mut me, EventSet::writable(), dst);
318 } else {
319 me2.schedule_write(buf, new_pos, &mut me);
320 }
321}
322
323impl Read for TcpStream {
324 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
325 let mut me = self.inner();
326
327 match mem::replace(&mut me.read, State::Empty) {
328 State::Empty => Err(wouldblock()),
329 State::Pending(buf) => {
330 me.read = State::Pending(buf);
331 Err(wouldblock())
332 }
333 State::Ready(mut cursor) => {
334 let amt = try!(cursor.read(buf));
335 if cursor.position() as usize == cursor.get_ref().len() {
338 me.iocp.put_buffer(cursor.into_inner());
339 self.imp.schedule_read(&mut me);
340 } else {
341 me.read = State::Ready(cursor);
342 }
343 Ok(amt)
344 }
345 State::Error(e) => {
346 self.imp.schedule_read(&mut me);
347 Err(e)
348 }
349 }
350 }
351}
352
353impl Write for TcpStream {
354 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
355 let mut me = self.inner();
356 let me = &mut *me;
357
358 match me.write {
359 State::Empty => {}
360 _ => return Err(wouldblock())
361 }
362
363 if me.iocp.port().is_none() {
364 return Err(wouldblock())
365 }
366
367 let mut intermediate = me.iocp.get_buffer(64 * 1024);
368 let amt = try!(intermediate.write(buf));
369 self.imp.schedule_write(intermediate, 0, me);
370 Ok(amt)
371 }
372
373 fn flush(&mut self) -> io::Result<()> {
374 Ok(())
375 }
376}
377
378impl Evented for TcpStream {
379 fn register(&self, selector: &mut Selector, token: Token,
380 interest: EventSet, opts: PollOpt) -> io::Result<()> {
381 let mut me = self.inner();
382 let me = &mut *me;
383 try!(me.iocp.register_socket(&me.socket, selector, token, interest,
384 opts));
385
386 if let Some(addr) = me.deferred_connect.take() {
391 return self.imp.schedule_connect(&addr, me).map(|_| ())
392 }
393 self.post_register(interest, me);
394 Ok(())
395 }
396
397 fn reregister(&self, selector: &mut Selector, token: Token,
398 interest: EventSet, opts: PollOpt) -> io::Result<()> {
399 let mut me = self.inner();
400 {
401 let me = &mut *me;
402 try!(me.iocp.reregister_socket(&me.socket, selector, token,
403 interest, opts));
404 }
405 self.post_register(interest, &mut me);
406 Ok(())
407 }
408
409 fn deregister(&self, selector: &mut Selector) -> io::Result<()> {
410 self.inner().iocp.checked_deregister(selector)
411 }
412}
413
414impl fmt::Debug for TcpStream {
415 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
416 "TcpStream { ... }".fmt(f)
417 }
418}
419
420impl Drop for TcpStream {
421 fn drop(&mut self) {
422 let mut inner = self.inner();
423 inner.socket = unsafe {
432 net::TcpStream::from_raw_socket(INVALID_SOCKET)
433 };
434
435 inner.iocp.deregister();
437 }
438}
439
440impl TcpListener {
441 pub fn new(socket: net::TcpListener, addr: &SocketAddr)
442 -> io::Result<TcpListener> {
443 Ok(TcpListener::new_family(socket, match *addr {
444 SocketAddr::V4(..) => Family::V4,
445 SocketAddr::V6(..) => Family::V6,
446 }))
447 }
448
449 fn new_family(socket: net::TcpListener, family: Family) -> TcpListener {
450 TcpListener {
451 imp: ListenerImp {
452 inner: FromRawArc::new(ListenerIo {
453 accept: Overlapped::new(accept_done),
454 inner: Mutex::new(ListenerInner {
455 socket: socket,
456 iocp: Registration::new(),
457 accept: State::Empty,
458 accept_buf: AcceptAddrsBuf::new(),
459 family: family,
460 }),
461 }),
462 },
463 }
464 }
465
466 pub fn accept(&self) -> io::Result<Option<(TcpStream, SocketAddr)>> {
467 let mut me = self.inner();
468
469 let ret = match mem::replace(&mut me.accept, State::Empty) {
470 State::Empty => return Ok(None),
471 State::Pending(t) => {
472 me.accept = State::Pending(t);
473 return Ok(None)
474 }
475 State::Ready((s, a)) => {
476 Ok(Some((TcpStream::new(s, None), a)))
477 }
478 State::Error(e) => Err(e),
479 };
480
481 self.imp.schedule_accept(&mut me);
482
483 return ret
484 }
485
486 pub fn local_addr(&self) -> io::Result<SocketAddr> {
487 self.inner().socket.local_addr()
488 }
489
490 pub fn try_clone(&self) -> io::Result<TcpListener> {
491 let inner = self.inner();
492 inner.socket.try_clone().map(|s| {
493 TcpListener::new_family(s, inner.family)
494 })
495 }
496
497 pub fn take_socket_error(&self) -> io::Result<()> {
498 net2::TcpListenerExt::take_error(&self.inner().socket).and_then(|e| {
499 match e {
500 Some(e) => Err(e),
501 None => Ok(())
502 }
503 })
504 }
505
506 fn inner(&self) -> MutexGuard<ListenerInner> {
507 self.imp.inner()
508 }
509}
510
511impl ListenerImp {
512 fn inner(&self) -> MutexGuard<ListenerInner> {
513 self.inner.inner.lock().unwrap()
514 }
515
516 fn schedule_accept(&self, me: &mut ListenerInner) {
517 match me.accept {
518 State::Empty => {}
519 _ => return
520 }
521
522 me.iocp.unset_readiness(EventSet::readable());
523
524 let res = match me.family {
525 Family::V4 => TcpBuilder::new_v4(),
526 Family::V6 => TcpBuilder::new_v6(),
527 }.and_then(|builder| unsafe {
528 trace!("scheduling an accept");
529 me.socket.accept_overlapped(&builder, &mut me.accept_buf,
530 self.inner.accept.get_mut())
531 });
532 match res {
533 Ok((socket, _)) => {
534 me.accept = State::Pending(socket);
536 mem::forget(self.clone());
537 }
538 Err(e) => {
539 me.accept = State::Error(e);
540 me.iocp.defer(EventSet::readable());
541 }
542 }
543 }
544
545 fn push(&self, me: &mut ListenerInner, set: EventSet,
547 into: &mut Vec<IoEvent>) {
548 if me.socket.as_raw_socket() != INVALID_SOCKET {
549 me.iocp.push_event(set, into);
550 }
551 }
552}
553
554fn accept_done(status: &CompletionStatus, dst: &mut Vec<IoEvent>) {
555 let me2 = ListenerImp {
556 inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) },
557 };
558
559 let mut me = me2.inner();
560 let socket = match mem::replace(&mut me.accept, State::Empty) {
561 State::Pending(s) => s,
562 _ => unreachable!(),
563 };
564 trace!("finished an accept");
565 me.accept = match me.accept_buf.parse(&me.socket) {
566 Ok(buf) => {
567 if let Some(remote_addr) = buf.remote() {
568 State::Ready((socket, remote_addr))
569 } else {
570 State::Error(io::Error::new(ErrorKind::Other,
571 "Could not obtain remote address"))
572 }
573 }
574 Err(e) => State::Error(e),
575 };
576 me2.push(&mut me, EventSet::readable(), dst);
577}
578
579impl Evented for TcpListener {
580 fn register(&self, selector: &mut Selector, token: Token,
581 interest: EventSet, opts: PollOpt) -> io::Result<()> {
582 let mut me = self.inner();
583 let me = &mut *me;
584 try!(me.iocp.register_socket(&me.socket, selector, token, interest,
585 opts));
586 self.imp.schedule_accept(me);
587 Ok(())
588 }
589
590 fn reregister(&self, selector: &mut Selector, token: Token,
591 interest: EventSet, opts: PollOpt) -> io::Result<()> {
592 let mut me = self.inner();
593 let me = &mut *me;
594 try!(me.iocp.reregister_socket(&me.socket, selector, token,
595 interest, opts));
596 self.imp.schedule_accept(me);
597 Ok(())
598 }
599
600 fn deregister(&self, selector: &mut Selector) -> io::Result<()> {
601 self.inner().iocp.checked_deregister(selector)
602 }
603}
604
605impl fmt::Debug for TcpListener {
606 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
607 "TcpListener { ... }".fmt(f)
608 }
609}
610
611impl Drop for TcpListener {
612 fn drop(&mut self) {
613 let mut inner = self.inner();
614
615 inner.socket = unsafe {
617 net::TcpListener::from_raw_socket(INVALID_SOCKET)
618 };
619
620 inner.iocp.deregister();
622 }
623}