tmp_mio/sys/windows/
tcp.rs

1use std::fmt;
2use std::io::{self, Read, Write, Cursor, ErrorKind};
3use std::mem;
4use std::net::{self, SocketAddr};
5use std::os::windows::prelude::*;
6use std::sync::{Mutex, MutexGuard};
7
8use net2::{self, TcpBuilder};
9use net::tcp::Shutdown;
10use miow::iocp::CompletionStatus;
11use miow::net::*;
12use winapi::*;
13
14use {Evented, EventSet, PollOpt, Selector, Token};
15use event::IoEvent;
16use sys::windows::selector::{Overlapped, Registration};
17use sys::windows::{wouldblock, Family};
18use sys::windows::from_raw_arc::FromRawArc;
19
20pub struct TcpStream {
21    /// Separately stored implementation to ensure that the `Drop`
22    /// implementation on this type is only executed when it's actually dropped
23    /// (many clones of this `imp` are made).
24    imp: StreamImp,
25}
26
27pub struct TcpListener {
28    imp: ListenerImp,
29}
30
31#[derive(Clone)]
32struct StreamImp {
33    /// A stable address and synchronized access for all internals. This serves
34    /// to ensure that all `Overlapped` pointers are valid for a long period of
35    /// time as well as allowing completion callbacks to have access to the
36    /// internals without having ownership.
37    ///
38    /// Note that the reference count also allows us "loan out" copies to
39    /// completion ports while I/O is running to guarantee that this stays alive
40    /// until the I/O completes. You'll notice a number of calls to
41    /// `mem::forget` below, and these only happen on successful scheduling of
42    /// I/O and are paired with `overlapped2arc!` macro invocations in the
43    /// completion callbacks (to have a decrement match the increment).
44    inner: FromRawArc<StreamIo>,
45}
46
47#[derive(Clone)]
48struct ListenerImp {
49    inner: FromRawArc<ListenerIo>,
50}
51
52struct StreamIo {
53    inner: Mutex<StreamInner>,
54    read: Overlapped, // also used for connect
55    write: Overlapped,
56}
57
58struct ListenerIo {
59    inner: Mutex<ListenerInner>,
60    accept: Overlapped,
61}
62
63struct StreamInner {
64    socket: net::TcpStream,
65    iocp: Registration,
66    deferred_connect: Option<SocketAddr>,
67    read: State<Vec<u8>, Cursor<Vec<u8>>>,
68    write: State<(Vec<u8>, usize), (Vec<u8>, usize)>,
69}
70
71struct ListenerInner {
72    socket: net::TcpListener,
73    family: Family,
74    iocp: Registration,
75    accept: State<net::TcpStream, (net::TcpStream, SocketAddr)>,
76    accept_buf: AcceptAddrsBuf,
77}
78
79enum State<T, U> {
80    Empty,              // no I/O operation in progress
81    Pending(T),         // an I/O operation is in progress
82    Ready(U),           // I/O has finished with this value
83    Error(io::Error),   // there was an I/O error
84}
85
86impl TcpStream {
87    fn new(socket: net::TcpStream,
88           deferred_connect: Option<SocketAddr>) -> TcpStream {
89        TcpStream {
90            imp: StreamImp {
91                inner: FromRawArc::new(StreamIo {
92                    read: Overlapped::new(read_done),
93                    write: Overlapped::new(write_done),
94                    inner: Mutex::new(StreamInner {
95                        socket: socket,
96                        iocp: Registration::new(),
97                        deferred_connect: deferred_connect,
98                        read: State::Empty,
99                        write: State::Empty,
100                    }),
101                }),
102            },
103        }
104    }
105
106    pub fn connect(socket: net::TcpStream, addr: &SocketAddr)
107                   -> io::Result<TcpStream> {
108        Ok(TcpStream::new(socket, Some(*addr)))
109    }
110
111    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
112        self.inner().socket.peer_addr()
113    }
114
115    pub fn local_addr(&self) -> io::Result<SocketAddr> {
116        self.inner().socket.local_addr()
117    }
118
119    pub fn try_clone(&self) -> io::Result<TcpStream> {
120        self.inner().socket.try_clone().map(|s| TcpStream::new(s, None))
121    }
122
123    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
124        self.inner().socket.shutdown(how)
125    }
126
127    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
128        net2::TcpStreamExt::set_nodelay(&self.inner().socket, nodelay)
129    }
130
131    pub fn set_keepalive(&self, seconds: Option<u32>) -> io::Result<()> {
132        let dur = seconds.map(|s| s * 1000);
133        net2::TcpStreamExt::set_keepalive_ms(&self.inner().socket, dur)
134    }
135
136    pub fn take_socket_error(&self) -> io::Result<()> {
137        net2::TcpStreamExt::take_error(&self.inner().socket).and_then(|e| {
138            match e {
139                Some(e) => Err(e),
140                None => Ok(())
141            }
142        })
143    }
144
145    fn inner(&self) -> MutexGuard<StreamInner> {
146        self.imp.inner()
147    }
148
149    fn post_register(&self, interest: EventSet, me: &mut StreamInner) {
150        if interest.is_readable() {
151            self.imp.schedule_read(me);
152        }
153
154        // At least with epoll, if a socket is registered with an interest in
155        // writing and it's immediately writable then a writable event is
156        // generated immediately, so do so here.
157        if interest.is_writable() {
158            if let State::Empty = me.write {
159                me.iocp.defer(EventSet::writable());
160            }
161        }
162    }
163}
164
165impl StreamImp {
166    fn inner(&self) -> MutexGuard<StreamInner> {
167        self.inner.inner.lock().unwrap()
168    }
169
170    fn schedule_connect(&self, addr: &SocketAddr, me: &mut StreamInner)
171                        -> io::Result<()> {
172        unsafe {
173            trace!("scheduling a connect");
174            try!(me.socket.connect_overlapped(addr, self.inner.read.get_mut()));
175        }
176        // see docs above on StreamImp.inner for rationale on forget
177        mem::forget(self.clone());
178        Ok(())
179    }
180
181    /// Issues a "read" operation for this socket, if applicable.
182    ///
183    /// This is intended to be invoked from either a completion callback or a
184    /// normal context. The function is infallible because errors are stored
185    /// internally to be returned later.
186    ///
187    /// It is required that this function is only called after the handle has
188    /// been registered with an event loop.
189    fn schedule_read(&self, me: &mut StreamInner) {
190        match me.read {
191            State::Empty => {}
192            _ => return,
193        }
194
195        me.iocp.unset_readiness(EventSet::readable());
196
197        let mut buf = me.iocp.get_buffer(64 * 1024);
198        let res = unsafe {
199            trace!("scheduling a read");
200            let cap = buf.capacity();
201            buf.set_len(cap);
202            me.socket.read_overlapped(&mut buf, self.inner.read.get_mut())
203        };
204        match res {
205            Ok(_) => {
206                // see docs above on StreamImp.inner for rationale on forget
207                me.read = State::Pending(buf);
208                mem::forget(self.clone());
209            }
210            Err(e) => {
211                // Like above, be sure to indicate that hup has happened
212                // whenever we get `ECONNRESET`
213                let mut set = EventSet::readable();
214                if e.raw_os_error() == Some(WSAECONNRESET as i32) {
215                    set = set | EventSet::hup();
216                }
217                me.read = State::Error(e);
218                me.iocp.defer(set);
219                me.iocp.put_buffer(buf);
220            }
221        }
222    }
223
224    /// Similar to `schedule_read`, except that this issues, well, writes.
225    ///
226    /// This function will continually attempt to write the entire contents of
227    /// the buffer `buf` until they have all been written. The `pos` argument is
228    /// the current offset within the buffer up to which the contents have
229    /// already been written.
230    ///
231    /// A new writable event (e.g. allowing another write) will only happen once
232    /// the buffer has been written completely (or hit an error).
233    fn schedule_write(&self, buf: Vec<u8>, pos: usize,
234                      me: &mut StreamInner) {
235
236        // About to write, clear any pending level triggered events
237        me.iocp.unset_readiness(EventSet::writable());
238
239        trace!("scheduling a write");
240        let err = unsafe {
241            me.socket.write_overlapped(&buf[pos..], self.inner.write.get_mut())
242        };
243        match err {
244            Ok(_) => {
245                // see docs above on StreamImp.inner for rationale on forget
246                me.write = State::Pending((buf, pos));
247                mem::forget(self.clone());
248            }
249            Err(e) => {
250                me.write = State::Error(e);
251                me.iocp.defer(EventSet::writable());
252                me.iocp.put_buffer(buf);
253            }
254        }
255    }
256
257    /// Pushes an event for this socket onto the selector its registered for.
258    ///
259    /// When an event is generated on this socket, if it happened after the
260    /// socket was closed then we don't want to actually push the event onto our
261    /// selector as otherwise it's just a spurious notification.
262    fn push(&self, me: &mut StreamInner, set: EventSet,
263            into: &mut Vec<IoEvent>) {
264        if me.socket.as_raw_socket() != INVALID_SOCKET {
265            me.iocp.push_event(set, into);
266        }
267    }
268}
269
270fn read_done(status: &CompletionStatus, dst: &mut Vec<IoEvent>) {
271    let me2 = StreamImp {
272        inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) },
273    };
274
275    let mut me = me2.inner();
276    match mem::replace(&mut me.read, State::Empty) {
277        State::Pending(mut buf) => {
278            trace!("finished a read: {}", status.bytes_transferred());
279
280            unsafe {
281                buf.set_len(status.bytes_transferred() as usize);
282            }
283
284            me.read = State::Ready(Cursor::new(buf));
285
286            // If we transferred 0 bytes then be sure to indicate that hup
287            // happened.
288            let mut e = EventSet::readable();
289
290            if status.bytes_transferred() == 0 {
291                e = e | EventSet::hup();
292            }
293
294            return me2.push(&mut me, e, dst)
295        }
296        s => me.read = s,
297    }
298
299    // If a read didn't complete, then the connect must have just finished.
300    trace!("finished a connect");
301    me2.push(&mut me, EventSet::writable(), dst);
302    me2.schedule_read(&mut me);
303}
304
305fn write_done(status: &CompletionStatus, dst: &mut Vec<IoEvent>) {
306    trace!("finished a write {}", status.bytes_transferred());
307    let me2 = StreamImp {
308        inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) },
309    };
310    let mut me = me2.inner();
311    let (buf, pos) = match mem::replace(&mut me.write, State::Empty) {
312        State::Pending(pair) => pair,
313        _ => unreachable!(),
314    };
315    let new_pos = pos + (status.bytes_transferred() as usize);
316    if new_pos == buf.len() {
317        me2.push(&mut me, EventSet::writable(), dst);
318    } else {
319        me2.schedule_write(buf, new_pos, &mut me);
320    }
321}
322
323impl Read for TcpStream {
324    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
325        let mut me = self.inner();
326
327        match mem::replace(&mut me.read, State::Empty) {
328            State::Empty => Err(wouldblock()),
329            State::Pending(buf) => {
330                me.read = State::Pending(buf);
331                Err(wouldblock())
332            }
333            State::Ready(mut cursor) => {
334                let amt = try!(cursor.read(buf));
335                // Once the entire buffer is written we need to schedule the
336                // next read operation.
337                if cursor.position() as usize == cursor.get_ref().len() {
338                    me.iocp.put_buffer(cursor.into_inner());
339                    self.imp.schedule_read(&mut me);
340                } else {
341                    me.read = State::Ready(cursor);
342                }
343                Ok(amt)
344            }
345            State::Error(e) => {
346                self.imp.schedule_read(&mut me);
347                Err(e)
348            }
349        }
350    }
351}
352
353impl Write for TcpStream {
354    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
355        let mut me = self.inner();
356        let me = &mut *me;
357
358        match me.write {
359            State::Empty => {}
360            _ => return Err(wouldblock())
361        }
362
363        if me.iocp.port().is_none() {
364            return Err(wouldblock())
365        }
366
367        let mut intermediate = me.iocp.get_buffer(64 * 1024);
368        let amt = try!(intermediate.write(buf));
369        self.imp.schedule_write(intermediate, 0, me);
370        Ok(amt)
371    }
372
373    fn flush(&mut self) -> io::Result<()> {
374        Ok(())
375    }
376}
377
378impl Evented for TcpStream {
379    fn register(&self, selector: &mut Selector, token: Token,
380                interest: EventSet, opts: PollOpt) -> io::Result<()> {
381        let mut me = self.inner();
382        let me = &mut *me;
383        try!(me.iocp.register_socket(&me.socket, selector, token, interest,
384                                     opts));
385
386        // If we were connected before being registered process that request
387        // here and go along our merry ways. Note that the callback for a
388        // successful connect will worry about generating writable/readable
389        // events and scheduling a new read.
390        if let Some(addr) = me.deferred_connect.take() {
391            return self.imp.schedule_connect(&addr, me).map(|_| ())
392        }
393        self.post_register(interest, me);
394        Ok(())
395    }
396
397    fn reregister(&self, selector: &mut Selector, token: Token,
398                  interest: EventSet, opts: PollOpt) -> io::Result<()> {
399        let mut me = self.inner();
400        {
401            let me = &mut *me;
402            try!(me.iocp.reregister_socket(&me.socket, selector, token,
403                                           interest, opts));
404        }
405        self.post_register(interest, &mut me);
406        Ok(())
407    }
408
409    fn deregister(&self, selector: &mut Selector) -> io::Result<()> {
410        self.inner().iocp.checked_deregister(selector)
411    }
412}
413
414impl fmt::Debug for TcpStream {
415    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
416        "TcpStream { ... }".fmt(f)
417    }
418}
419
420impl Drop for TcpStream {
421    fn drop(&mut self) {
422        let mut inner = self.inner();
423        // When the `TcpSocket` itself is dropped then we close the internal
424        // handle (e.g. call `closesocket`). This will cause all pending I/O
425        // operations to forcibly finish and we'll get notifications for all of
426        // them and clean up the rest of our internal state (yay!).
427        //
428        // This is achieved by replacing our socket with an invalid one, so all
429        // further operations will return an error (but no further operations
430        // should be done anyway).
431        inner.socket = unsafe {
432            net::TcpStream::from_raw_socket(INVALID_SOCKET)
433        };
434
435        // Then run any finalization code including level notifications
436        inner.iocp.deregister();
437    }
438}
439
440impl TcpListener {
441    pub fn new(socket: net::TcpListener, addr: &SocketAddr)
442               -> io::Result<TcpListener> {
443        Ok(TcpListener::new_family(socket, match *addr {
444            SocketAddr::V4(..) => Family::V4,
445            SocketAddr::V6(..) => Family::V6,
446        }))
447    }
448
449    fn new_family(socket: net::TcpListener, family: Family) -> TcpListener {
450        TcpListener {
451            imp: ListenerImp {
452                inner: FromRawArc::new(ListenerIo {
453                    accept: Overlapped::new(accept_done),
454                    inner: Mutex::new(ListenerInner {
455                        socket: socket,
456                        iocp: Registration::new(),
457                        accept: State::Empty,
458                        accept_buf: AcceptAddrsBuf::new(),
459                        family: family,
460                    }),
461                }),
462            },
463        }
464    }
465
466    pub fn accept(&self) -> io::Result<Option<(TcpStream, SocketAddr)>> {
467        let mut me = self.inner();
468
469        let ret = match mem::replace(&mut me.accept, State::Empty) {
470            State::Empty => return Ok(None),
471            State::Pending(t) => {
472                me.accept = State::Pending(t);
473                return Ok(None)
474            }
475            State::Ready((s, a)) => {
476                Ok(Some((TcpStream::new(s, None), a)))
477            }
478            State::Error(e) => Err(e),
479        };
480
481        self.imp.schedule_accept(&mut me);
482
483        return ret
484    }
485
486    pub fn local_addr(&self) -> io::Result<SocketAddr> {
487        self.inner().socket.local_addr()
488    }
489
490    pub fn try_clone(&self) -> io::Result<TcpListener> {
491        let inner = self.inner();
492        inner.socket.try_clone().map(|s| {
493            TcpListener::new_family(s, inner.family)
494        })
495    }
496
497    pub fn take_socket_error(&self) -> io::Result<()> {
498        net2::TcpListenerExt::take_error(&self.inner().socket).and_then(|e| {
499            match e {
500                Some(e) => Err(e),
501                None => Ok(())
502            }
503        })
504    }
505
506    fn inner(&self) -> MutexGuard<ListenerInner> {
507        self.imp.inner()
508    }
509}
510
511impl ListenerImp {
512    fn inner(&self) -> MutexGuard<ListenerInner> {
513        self.inner.inner.lock().unwrap()
514    }
515
516    fn schedule_accept(&self, me: &mut ListenerInner) {
517        match me.accept {
518            State::Empty => {}
519            _ => return
520        }
521
522        me.iocp.unset_readiness(EventSet::readable());
523
524        let res = match me.family {
525            Family::V4 => TcpBuilder::new_v4(),
526            Family::V6 => TcpBuilder::new_v6(),
527        }.and_then(|builder| unsafe {
528            trace!("scheduling an accept");
529            me.socket.accept_overlapped(&builder, &mut me.accept_buf,
530                                        self.inner.accept.get_mut())
531        });
532        match res {
533            Ok((socket, _)) => {
534                // see docs above on StreamImp.inner for rationale on forget
535                me.accept = State::Pending(socket);
536                mem::forget(self.clone());
537            }
538            Err(e) => {
539                me.accept = State::Error(e);
540                me.iocp.defer(EventSet::readable());
541            }
542        }
543    }
544
545    // See comments in StreamImp::push
546    fn push(&self, me: &mut ListenerInner, set: EventSet,
547            into: &mut Vec<IoEvent>) {
548        if me.socket.as_raw_socket() != INVALID_SOCKET {
549            me.iocp.push_event(set, into);
550        }
551    }
552}
553
554fn accept_done(status: &CompletionStatus, dst: &mut Vec<IoEvent>) {
555    let me2 = ListenerImp {
556        inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) },
557    };
558
559    let mut me = me2.inner();
560    let socket = match mem::replace(&mut me.accept, State::Empty) {
561        State::Pending(s) => s,
562        _ => unreachable!(),
563    };
564    trace!("finished an accept");
565    me.accept = match me.accept_buf.parse(&me.socket) {
566        Ok(buf) => {
567            if let Some(remote_addr) = buf.remote() {
568                State::Ready((socket, remote_addr))
569            } else {
570                State::Error(io::Error::new(ErrorKind::Other,
571                                            "Could not obtain remote address"))
572            }
573        }
574        Err(e) => State::Error(e),
575    };
576    me2.push(&mut me, EventSet::readable(), dst);
577}
578
579impl Evented for TcpListener {
580    fn register(&self, selector: &mut Selector, token: Token,
581                interest: EventSet, opts: PollOpt) -> io::Result<()> {
582        let mut me = self.inner();
583        let me = &mut *me;
584        try!(me.iocp.register_socket(&me.socket, selector, token, interest,
585                                     opts));
586        self.imp.schedule_accept(me);
587        Ok(())
588    }
589
590    fn reregister(&self, selector: &mut Selector, token: Token,
591                  interest: EventSet, opts: PollOpt) -> io::Result<()> {
592        let mut me = self.inner();
593        let me = &mut *me;
594        try!(me.iocp.reregister_socket(&me.socket, selector, token,
595                                       interest, opts));
596        self.imp.schedule_accept(me);
597        Ok(())
598    }
599
600    fn deregister(&self, selector: &mut Selector) -> io::Result<()> {
601        self.inner().iocp.checked_deregister(selector)
602    }
603}
604
605impl fmt::Debug for TcpListener {
606    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
607        "TcpListener { ... }".fmt(f)
608    }
609}
610
611impl Drop for TcpListener {
612    fn drop(&mut self) {
613        let mut inner = self.inner();
614
615        // See comments in TcpStream
616        inner.socket = unsafe {
617            net::TcpListener::from_raw_socket(INVALID_SOCKET)
618        };
619
620        // Then run any finalization code including level notifications
621        inner.iocp.deregister();
622    }
623}