hawk_ws/
connection.rs

1use std::borrow::Borrow;
2use std::collections::VecDeque;
3use std::io::{Cursor, Read, Seek, SeekFrom, Write};
4use std::mem::replace;
5use std::net::SocketAddr;
6use std::str::from_utf8;
7
8use mio::tcp::TcpStream;
9use mio::timer::Timeout;
10use mio::{Ready, Token};
11use url;
12
13#[cfg(feature = "ssl")]
14use openssl::ssl::HandshakeError;
15
16use frame::Frame;
17use handler::Handler;
18use handshake::{Handshake, Request, Response};
19use message::Message;
20use protocol::{CloseCode, OpCode};
21use result::{Error, Kind, Result};
22use stream::{Stream, TryReadBuf, TryWriteBuf};
23
24use self::Endpoint::*;
25use self::State::*;
26
27use super::Settings;
28
29#[derive(Debug)]
30pub enum State {
31    // Tcp connection accepted, waiting for handshake to complete
32    Connecting(Cursor<Vec<u8>>, Cursor<Vec<u8>>),
33    // Ready to send/receive messages
34    Open,
35    AwaitingClose,
36    RespondingClose,
37    FinishedClose,
38}
39
40/// A little more semantic than a boolean
41#[derive(Debug, Eq, PartialEq, Clone)]
42pub enum Endpoint {
43    /// Will mask outgoing frames
44    Client(url::Url),
45    /// Won't mask outgoing frames
46    Server,
47}
48
49impl State {
50    #[inline]
51    pub fn is_connecting(&self) -> bool {
52        match *self {
53            State::Connecting(..) => true,
54            _ => false,
55        }
56    }
57
58    #[allow(dead_code)]
59    #[inline]
60    pub fn is_open(&self) -> bool {
61        match *self {
62            State::Open => true,
63            _ => false,
64        }
65    }
66
67    #[inline]
68    pub fn is_closing(&self) -> bool {
69        match *self {
70            State::AwaitingClose => true,
71            State::FinishedClose => true,
72            _ => false,
73        }
74    }
75}
76
77pub struct Connection<H>
78where
79    H: Handler,
80{
81    token: Token,
82    socket: Stream,
83    state: State,
84    endpoint: Endpoint,
85    events: Ready,
86
87    fragments: VecDeque<Frame>,
88
89    in_buffer: Cursor<Vec<u8>>,
90    out_buffer: Cursor<Vec<u8>>,
91
92    handler: H,
93
94    addresses: Vec<SocketAddr>,
95
96    settings: Settings,
97    connection_id: u32,
98}
99
100impl<H> Connection<H>
101where
102    H: Handler,
103{
104    pub fn new(
105        tok: Token,
106        sock: TcpStream,
107        handler: H,
108        settings: Settings,
109        connection_id: u32,
110    ) -> Connection<H> {
111        Connection {
112            token: tok,
113            socket: Stream::tcp(sock),
114            state: Connecting(
115                Cursor::new(Vec::with_capacity(2048)),
116                Cursor::new(Vec::with_capacity(2048)),
117            ),
118            endpoint: Endpoint::Server,
119            events: Ready::empty(),
120            fragments: VecDeque::with_capacity(settings.fragments_capacity),
121            in_buffer: Cursor::new(Vec::with_capacity(settings.in_buffer_capacity)),
122            out_buffer: Cursor::new(Vec::with_capacity(settings.out_buffer_capacity)),
123            handler: handler,
124            addresses: Vec::new(),
125            settings: settings,
126            connection_id: connection_id,
127        }
128    }
129
130    pub fn as_server(&mut self) -> Result<()> {
131        Ok(self.events.insert(Ready::readable()))
132    }
133
134    pub fn as_client(&mut self, url: url::Url, addrs: Vec<SocketAddr>) -> Result<()> {
135        if let Connecting(ref mut req_buf, _) = self.state {
136            let req = self.handler.build_request(&url)?;
137            self.addresses = addrs;
138            self.events.insert(Ready::writable());
139            self.endpoint = Endpoint::Client(url);
140            req.format(req_buf.get_mut())
141        } else {
142            Err(Error::new(
143                Kind::Internal,
144                "Tried to set connection to client while not connecting.",
145            ))
146        }
147    }
148
149    #[cfg(feature = "ssl")]
150    pub fn encrypt(&mut self) -> Result<()> {
151        let sock = try!(self.socket().try_clone());
152        let ssl_stream = match self.endpoint {
153            Server => self.handler.upgrade_ssl_server(sock),
154            Client(ref url) => self.handler.upgrade_ssl_client(sock, url),
155        };
156
157        match ssl_stream {
158            Ok(stream) => Ok(self.socket = Stream::tls_live(stream)),
159            Err(Error {
160                kind: Kind::SslHandshake(handshake_err),
161                details,
162            }) => match handshake_err {
163                HandshakeError::SetupFailure(_) => {
164                    Err(Error::new(Kind::SslHandshake(handshake_err), details))
165                }
166                HandshakeError::Failure(mid) | HandshakeError::Interrupted(mid) => {
167                    Ok(self.socket = Stream::tls(mid))
168                }
169            },
170            Err(e) => Err(e),
171        }
172    }
173
174    pub fn token(&self) -> Token {
175        self.token
176    }
177
178    pub fn socket(&self) -> &TcpStream {
179        self.socket.evented()
180    }
181
182    pub fn connection_id(&self) -> u32 {
183        self.connection_id
184    }
185
186    fn peer_addr(&self) -> String {
187        if let Ok(addr) = self.socket.peer_addr() {
188            addr.to_string()
189        } else {
190            "UNKNOWN".into()
191        }
192    }
193
194    // Resetting may be necessary in order to try all possible addresses for a server
195    #[cfg(feature = "ssl")]
196    pub fn reset(&mut self) -> Result<()> {
197        // if self.is_client() {
198        if let Client(ref url) = self.endpoint {
199            if let Connecting(ref mut req, ref mut res) = self.state {
200                req.set_position(0);
201                res.set_position(0);
202                self.events.remove(Ready::readable());
203                self.events.insert(Ready::writable());
204
205                if let Some(ref addr) = self.addresses.pop() {
206                    let sock = try!(TcpStream::connect(addr));
207                    if self.socket.is_tls() {
208                        let ssl_stream = self.handler.upgrade_ssl_client(sock, url);
209                        match ssl_stream {
210                            Ok(stream) => Ok(self.socket = Stream::tls_live(stream)),
211                            Err(Error {
212                                kind: Kind::SslHandshake(handshake_err),
213                                details,
214                            }) => match handshake_err {
215                                HandshakeError::SetupFailure(_) => {
216                                    Err(Error::new(Kind::SslHandshake(handshake_err), details))
217                                }
218                                HandshakeError::Failure(mid) | HandshakeError::Interrupted(mid) => {
219                                    Ok(self.socket = Stream::tls(mid))
220                                }
221                            },
222                            Err(e) => Err(e),
223                        }
224                    } else {
225                        Ok(self.socket = Stream::tcp(sock))
226                    }
227                } else {
228                    if self.settings.panic_on_new_connection {
229                        panic!("Unable to connect to server.");
230                    }
231                    Err(Error::new(Kind::Internal, "Exhausted possible addresses."))
232                }
233            } else {
234                Err(Error::new(
235                    Kind::Internal,
236                    "Unable to reset client connection because it is active.",
237                ))
238            }
239        } else {
240            Err(Error::new(
241                Kind::Internal,
242                "Server connections cannot be reset.",
243            ))
244        }
245    }
246
247    #[cfg(not(feature = "ssl"))]
248    pub fn reset(&mut self) -> Result<()> {
249        if self.is_client() {
250            if let Connecting(ref mut req, ref mut res) = self.state {
251                req.set_position(0);
252                res.set_position(0);
253                self.events.remove(Ready::readable());
254                self.events.insert(Ready::writable());
255
256                if let Some(ref addr) = self.addresses.pop() {
257                    let sock = try!(TcpStream::connect(addr));
258                    Ok(self.socket = Stream::tcp(sock))
259                } else {
260                    if self.settings.panic_on_new_connection {
261                        panic!("Unable to connect to server.");
262                    }
263                    Err(Error::new(Kind::Internal, "Exhausted possible addresses."))
264                }
265            } else {
266                Err(Error::new(
267                    Kind::Internal,
268                    "Unable to reset client connection because it is active.",
269                ))
270            }
271        } else {
272            Err(Error::new(
273                Kind::Internal,
274                "Server connections cannot be reset.",
275            ))
276        }
277    }
278
279    pub fn events(&self) -> Ready {
280        self.events
281    }
282
283    pub fn is_client(&self) -> bool {
284        match self.endpoint {
285            Client(_) => true,
286            Server => false,
287        }
288    }
289
290    pub fn is_server(&self) -> bool {
291        match self.endpoint {
292            Client(_) => false,
293            Server => true,
294        }
295    }
296
297    pub fn shutdown(&mut self) {
298        self.handler.on_shutdown();
299        if let Err(err) = self.send_close(CloseCode::Away, "Shutting down.") {
300            self.handler.on_error(err);
301            self.disconnect()
302        }
303    }
304
305    #[inline]
306    pub fn new_timeout(&mut self, event: Token, timeout: Timeout) -> Result<()> {
307        self.handler.on_new_timeout(event, timeout)
308    }
309
310    #[inline]
311    pub fn timeout_triggered(&mut self, event: Token) -> Result<()> {
312        self.handler.on_timeout(event)
313    }
314
315    pub fn error(&mut self, err: Error) {
316        match self.state {
317            Connecting(_, ref mut res) => match err.kind {
318                #[cfg(feature = "ssl")]
319                Kind::Ssl(_) => {
320                    self.handler.on_error(err);
321                    self.events = Ready::empty();
322                }
323                Kind::Io(_) => {
324                    self.handler.on_error(err);
325                    self.events = Ready::empty();
326                }
327                Kind::Protocol => {
328                    let msg = err.to_string();
329                    self.handler.on_error(err);
330                    if let Server = self.endpoint {
331                        res.get_mut().clear();
332                        if let Err(err) =
333                            write!(res.get_mut(), "HTTP/1.1 400 Bad Request\r\n\r\n{}", msg)
334                        {
335                            self.handler.on_error(Error::from(err));
336                            self.events = Ready::empty();
337                        } else {
338                            self.events.remove(Ready::readable());
339                            self.events.insert(Ready::writable());
340                        }
341                    } else {
342                        self.events = Ready::empty();
343                    }
344                }
345                _ => {
346                    let msg = err.to_string();
347                    self.handler.on_error(err);
348                    if let Server = self.endpoint {
349                        res.get_mut().clear();
350                        if let Err(err) = write!(
351                            res.get_mut(),
352                            "HTTP/1.1 500 Internal Server Error\r\n\r\n{}",
353                            msg
354                        ) {
355                            self.handler.on_error(Error::from(err));
356                            self.events = Ready::empty();
357                        } else {
358                            self.events.remove(Ready::readable());
359                            self.events.insert(Ready::writable());
360                        }
361                    } else {
362                        self.events = Ready::empty();
363                    }
364                }
365            },
366            _ => {
367                match err.kind {
368                    Kind::Internal => {
369                        if self.settings.panic_on_internal {
370                            panic!("Panicking on internal error -- {}", err);
371                        }
372                        let reason = format!("{}", err);
373
374                        self.handler.on_error(err);
375                        if let Err(err) = self.send_close(CloseCode::Error, reason) {
376                            self.handler.on_error(err);
377                            self.disconnect()
378                        }
379                    }
380                    Kind::Capacity => {
381                        if self.settings.panic_on_capacity {
382                            panic!("Panicking on capacity error -- {}", err);
383                        }
384                        let reason = format!("{}", err);
385
386                        self.handler.on_error(err);
387                        if let Err(err) = self.send_close(CloseCode::Size, reason) {
388                            self.handler.on_error(err);
389                            self.disconnect()
390                        }
391                    }
392                    Kind::Protocol => {
393                        if self.settings.panic_on_protocol {
394                            panic!("Panicking on protocol error -- {}", err);
395                        }
396                        let reason = format!("{}", err);
397
398                        self.handler.on_error(err);
399                        if let Err(err) = self.send_close(CloseCode::Protocol, reason) {
400                            self.handler.on_error(err);
401                            self.disconnect()
402                        }
403                    }
404                    Kind::Encoding(_) => {
405                        if self.settings.panic_on_encoding {
406                            panic!("Panicking on encoding error -- {}", err);
407                        }
408                        let reason = format!("{}", err);
409
410                        self.handler.on_error(err);
411                        if let Err(err) = self.send_close(CloseCode::Invalid, reason) {
412                            self.handler.on_error(err);
413                            self.disconnect()
414                        }
415                    }
416                    Kind::Http(_) => {
417                        // This may happen if some handler writes a bad response
418                        self.handler.on_error(err);
419                        error!("Disconnecting WebSocket.");
420                        self.disconnect()
421                    }
422                    Kind::Custom(_) => {
423                        self.handler.on_error(err);
424                    }
425                    Kind::Timer(_) => {
426                        if self.settings.panic_on_timeout {
427                            panic!("Panicking on timer failure -- {}", err);
428                        }
429                        self.handler.on_error(err);
430                    }
431                    Kind::Queue(_) => {
432                        if self.settings.panic_on_queue {
433                            panic!("Panicking on queue error -- {}", err);
434                        }
435                        self.handler.on_error(err);
436                    }
437                    _ => {
438                        if self.settings.panic_on_io {
439                            panic!("Panicking on io error -- {}", err);
440                        }
441                        self.handler.on_error(err);
442                        self.disconnect()
443                    }
444                }
445            }
446        }
447    }
448
449    pub fn disconnect(&mut self) {
450        match self.state {
451            RespondingClose | FinishedClose | Connecting(_, _) => (),
452            _ => {
453                self.handler.on_close(CloseCode::Abnormal, "");
454            }
455        }
456        self.events = Ready::empty()
457    }
458
459    pub fn consume(self) -> H {
460        self.handler
461    }
462
463    fn write_handshake(&mut self) -> Result<()> {
464        if let Connecting(ref mut req, ref mut res) = self.state {
465            match self.endpoint {
466                Server => {
467                    let mut done = false;
468                    if let Some(_) = try!(self.socket.try_write_buf(res)) {
469                        if res.position() as usize == res.get_ref().len() {
470                            done = true
471                        }
472                    }
473                    if !done {
474                        return Ok(());
475                    }
476                }
477                Client(_) => {
478                    if let Some(_) = try!(self.socket.try_write_buf(req)) {
479                        if req.position() as usize == req.get_ref().len() {
480                            trace!(
481                                "Finished writing handshake request to {}",
482                                self.socket
483                                    .peer_addr()
484                                    .map(|addr| addr.to_string())
485                                    .unwrap_or("UNKNOWN".into())
486                            );
487                            self.events.insert(Ready::readable());
488                            self.events.remove(Ready::writable());
489                        }
490                    }
491                    return Ok(());
492                }
493            }
494        }
495
496        if let Connecting(ref req, ref res) = replace(&mut self.state, Open) {
497            trace!(
498                "Finished writing handshake response to {}",
499                self.peer_addr()
500            );
501
502            let request = match Request::parse(req.get_ref()) {
503                Ok(Some(req)) => req,
504                _ => {
505                    // An error should already have been sent for the first time it failed to
506                    // parse. We don't call disconnect here because `on_open` hasn't been called yet.
507                    self.state = FinishedClose;
508                    self.events = Ready::empty();
509                    return Ok(());
510                }
511            };
512
513            let response = try!(try!(Response::parse(res.get_ref())).ok_or(Error::new(
514                Kind::Internal,
515                "Failed to parse response after handshake is complete."
516            )));
517
518            if response.status() != 101 {
519                self.events = Ready::empty();
520                return Ok(());
521            } else {
522                try!(self.handler.on_open(Handshake {
523                    request: request,
524                    response: response,
525                    peer_addr: self.socket.peer_addr().ok(),
526                    local_addr: self.socket.local_addr().ok(),
527                }));
528                debug!("Connection to {} is now open.", self.peer_addr());
529                self.events.insert(Ready::readable());
530                return Ok(self.check_events());
531            }
532        } else {
533            Err(Error::new(
534                Kind::Internal,
535                "Tried to write WebSocket handshake while not in connecting state!",
536            ))
537        }
538    }
539
540    fn read_handshake(&mut self) -> Result<()> {
541        if let Connecting(ref mut req, ref mut res) = self.state {
542            match self.endpoint {
543                Server => {
544                    if let Some(_) = try!(self.socket.try_read_buf(req.get_mut())) {
545                        if let Some(ref request) = try!(Request::parse(req.get_ref())) {
546                            trace!("Handshake request received: \n{}", request);
547                            let response = try!(self.handler.on_request(request));
548                            try!(response.format(res.get_mut()));
549                            self.events.remove(Ready::readable());
550                            self.events.insert(Ready::writable());
551                        }
552                    }
553                    return Ok(());
554                }
555                Client(_) => {
556                    if let Some(_) = try!(self.socket.try_read_buf(res.get_mut())) {
557                        // TODO: see if this can be optimized with drain
558                        let end = {
559                            let data = res.get_ref();
560                            let end = data
561                                .iter()
562                                .enumerate()
563                                .take_while(|&(ind, _)| !data[..ind].ends_with(b"\r\n\r\n"))
564                                .count();
565                            if !data[..end].ends_with(b"\r\n\r\n") {
566                                return Ok(());
567                            }
568                            self.in_buffer.get_mut().extend(&data[end..]);
569                            end
570                        };
571                        res.get_mut().truncate(end);
572                    }
573                }
574            }
575        }
576
577        if let Connecting(ref req, ref res) = replace(&mut self.state, Open) {
578            trace!(
579                "Finished reading handshake response from {}",
580                self.peer_addr()
581            );
582
583            let request = try!(try!(Request::parse(req.get_ref())).ok_or(Error::new(
584                Kind::Internal,
585                "Failed to parse request after handshake is complete."
586            )));
587
588            let response = try!(try!(Response::parse(res.get_ref())).ok_or(Error::new(
589                Kind::Internal,
590                "Failed to parse response after handshake is complete."
591            )));
592
593            trace!("Handshake response received: \n{}", response);
594
595            if response.status() != 101 {
596                if response.status() != 301 && response.status() != 302 {
597                    return Err(Error::new(Kind::Protocol, "Handshake failed."));
598                } else {
599                    return Ok(());
600                }
601            }
602
603            if self.settings.key_strict {
604                let req_key = try!(request.hashed_key());
605                let res_key = try!(from_utf8(try!(response.key())));
606                if req_key != res_key {
607                    return Err(Error::new(
608                        Kind::Protocol,
609                        format!(
610                            "Received incorrect WebSocket Accept key: {} vs {}",
611                            req_key, res_key
612                        ),
613                    ));
614                }
615            }
616
617            try!(self.handler.on_response(&response));
618            try!(self.handler.on_open(Handshake {
619                request: request,
620                response: response,
621                peer_addr: self.socket.peer_addr().ok(),
622                local_addr: self.socket.local_addr().ok(),
623            }));
624
625            // check to see if there is anything to read already
626            if !self.in_buffer.get_ref().is_empty() {
627                try!(self.read_frames());
628            }
629
630            return Ok(self.check_events());
631        }
632        Err(Error::new(
633            Kind::Internal,
634            "Tried to read WebSocket handshake while not in connecting state!",
635        ))
636    }
637
638    pub fn read(&mut self) -> Result<()> {
639        if self.socket.is_negotiating() {
640            trace!("Performing TLS negotiation on {}.", self.peer_addr());
641            try!(self.socket.clear_negotiating());
642            self.write()
643        } else {
644            let res = if self.state.is_connecting() {
645                trace!("Ready to read handshake from {}.", self.peer_addr());
646                self.read_handshake()
647            } else {
648                trace!("Ready to read messages from {}.", self.peer_addr());
649                while let Some(len) = try!(self.buffer_in()) {
650                    try!(self.read_frames());
651                    if len == 0 {
652                        if self.events.is_writable() {
653                            self.events.remove(Ready::readable());
654                        } else {
655                            self.disconnect()
656                        }
657                        break;
658                    }
659                }
660                Ok(())
661            };
662
663            if self.socket.is_negotiating() && res.is_ok() {
664                self.events.remove(Ready::readable());
665                self.events.insert(Ready::writable());
666            }
667            res
668        }
669    }
670
671    fn read_frames(&mut self) -> Result<()> {
672        while let Some(mut frame) = try!(Frame::parse(&mut self.in_buffer)) {
673            match self.state {
674                // Ignore data received after receiving close frame
675                RespondingClose | FinishedClose => continue,
676                _ => (),
677            }
678
679            if self.settings.masking_strict {
680                if frame.is_masked() {
681                    if self.is_client() {
682                        return Err(Error::new(
683                            Kind::Protocol,
684                            "Received masked frame from a server endpoint.",
685                        ));
686                    }
687                } else {
688                    if self.is_server() {
689                        return Err(Error::new(
690                            Kind::Protocol,
691                            "Received unmasked frame from a client endpoint.",
692                        ));
693                    }
694                }
695            }
696
697            // This is safe whether or not a frame is masked.
698            frame.remove_mask();
699
700            if let Some(frame) = try!(self.handler.on_frame(frame)) {
701                if frame.is_final() {
702                    match frame.opcode() {
703                        // singleton data frames
704                        OpCode::Text => {
705                            trace!("Received text frame {:?}", frame);
706                            // since we are going to handle this, there can't be an ongoing
707                            // message
708                            if !self.fragments.is_empty() {
709                                return Err(Error::new(Kind::Protocol, "Received unfragmented text frame while processing fragmented message."));
710                            }
711                            let msg = Message::text(try!(String::from_utf8(frame.into_data())
712                                .map_err(|err| err.utf8_error())));
713                            try!(self.handler.on_message(msg));
714                        }
715                        OpCode::Binary => {
716                            trace!("Received binary frame {:?}", frame);
717                            // since we are going to handle this, there can't be an ongoing
718                            // message
719                            if !self.fragments.is_empty() {
720                                return Err(Error::new(Kind::Protocol, "Received unfragmented binary frame while processing fragmented message."));
721                            }
722                            let data = frame.into_data();
723                            try!(self.handler.on_message(Message::binary(data)));
724                        }
725                        // control frames
726                        OpCode::Close => {
727                            trace!("Received close frame {:?}", frame);
728                            // Closing handshake
729                            if self.state.is_closing() {
730                                if self.is_server() {
731                                    // Finished handshake, disconnect server side
732                                    self.events = Ready::empty()
733                                } else {
734                                    // We are a client, so we wait for the server to close the
735                                    // connection
736                                }
737                            } else {
738                                // Starting handshake, will send the responding close frame
739                                self.state = RespondingClose;
740                            }
741
742                            let mut close_code = [0u8; 2];
743                            let mut data = Cursor::new(frame.into_data());
744                            if let 2 = try!(data.read(&mut close_code)) {
745                                let raw_code: u16 =
746                                    (close_code[0] as u16) << 8 | (close_code[1] as u16);
747                                trace!(
748                                    "Connection to {} received raw close code: {:?}, {:?}",
749                                    self.peer_addr(),
750                                    raw_code,
751                                    close_code
752                                );
753                                let named = CloseCode::from(raw_code);
754                                if let CloseCode::Other(code) = named {
755                                    if code < 1000 ||
756                                            code >= 5000 ||
757                                            code == 1004 ||
758                                            code == 1014 ||
759                                            code == 1016 || // these below are here to pass the autobahn test suite
760                                            code == 1100 || // we shouldn't need them later
761                                            code == 2000 ||
762                                            code == 2999
763                                    {
764                                        return Err(Error::new(
765                                            Kind::Protocol,
766                                            format!(
767                                                "Received invalid close code from endpoint: {}",
768                                                code
769                                            ),
770                                        ));
771                                    }
772                                }
773                                let has_reason = {
774                                    if let Ok(reason) = from_utf8(&data.get_ref()[2..]) {
775                                        self.handler.on_close(named, reason); // note reason may be an empty string
776                                        true
777                                    } else {
778                                        self.handler.on_close(named, "");
779                                        false
780                                    }
781                                };
782
783                                if let CloseCode::Abnormal = named {
784                                    return Err(Error::new(
785                                        Kind::Protocol,
786                                        "Received abnormal close code from endpoint.",
787                                    ));
788                                } else if let CloseCode::Status = named {
789                                    return Err(Error::new(
790                                        Kind::Protocol,
791                                        "Received no status close code from endpoint.",
792                                    ));
793                                } else if let CloseCode::Restart = named {
794                                    return Err(Error::new(
795                                        Kind::Protocol,
796                                        "Restart close code is not supported.",
797                                    ));
798                                } else if let CloseCode::Again = named {
799                                    return Err(Error::new(
800                                        Kind::Protocol,
801                                        "Try again later close code is not supported.",
802                                    ));
803                                } else if let CloseCode::Tls = named {
804                                    return Err(Error::new(
805                                        Kind::Protocol,
806                                        "Received TLS close code outside of TLS handshake.",
807                                    ));
808                                } else {
809                                    if !self.state.is_closing() {
810                                        if has_reason {
811                                            try!(self.send_close(named, "")); // note this drops any extra close data
812                                        } else {
813                                            try!(self.send_close(CloseCode::Invalid, ""));
814                                        }
815                                    } else {
816                                        self.state = FinishedClose;
817                                    }
818                                }
819                            } else {
820                                // This is not an error. It is allowed behavior in the
821                                // protocol, so we don't trigger an error.
822                                // "If there is no such data in the Close control frame,
823                                // _The WebSocket Connection Close Reason_ is the empty string."
824                                self.handler.on_close(CloseCode::Status, "");
825                                if !self.state.is_closing() {
826                                    try!(self.send_close(CloseCode::Empty, ""));
827                                } else {
828                                    self.state = FinishedClose;
829                                }
830                            }
831                        }
832                        OpCode::Ping => {
833                            trace!("Received ping frame {:?}", frame);
834                            try!(self.send_pong(frame.into_data()));
835                        }
836                        OpCode::Pong => {
837                            trace!("Received pong frame {:?}", frame);
838                            // no ping validation for now
839                        }
840                        // last fragment
841                        OpCode::Continue => {
842                            trace!("Received final fragment {:?}", frame);
843                            if let Some(first) = self.fragments.pop_front() {
844                                let size = self.fragments.iter().fold(
845                                    first.payload().len() + frame.payload().len(),
846                                    |len, frame| len + frame.payload().len(),
847                                );
848                                match first.opcode() {
849                                    OpCode::Text => {
850                                        trace!("Constructing text message from fragments: {:?} -> {:?} -> {:?}", first, self.fragments.iter().collect::<Vec<&Frame>>(), frame);
851                                        let mut data = Vec::with_capacity(size);
852                                        data.extend(first.into_data());
853                                        while let Some(frame) = self.fragments.pop_front() {
854                                            data.extend(frame.into_data());
855                                        }
856                                        data.extend(frame.into_data());
857
858                                        let string =
859                                            try!(String::from_utf8(data)
860                                                .map_err(|err| err.utf8_error()));
861
862                                        trace!(
863                                            "Calling handler with constructed message: {:?}",
864                                            string
865                                        );
866                                        try!(self.handler.on_message(Message::text(string)));
867                                    }
868                                    OpCode::Binary => {
869                                        trace!("Constructing binary message from fragments: {:?} -> {:?} -> {:?}", first, self.fragments.iter().collect::<Vec<&Frame>>(), frame);
870                                        let mut data = Vec::with_capacity(size);
871                                        data.extend(first.into_data());
872
873                                        while let Some(frame) = self.fragments.pop_front() {
874                                            data.extend(frame.into_data());
875                                        }
876
877                                        data.extend(frame.into_data());
878
879                                        trace!(
880                                            "Calling handler with constructed message: {:?}",
881                                            data
882                                        );
883                                        try!(self.handler.on_message(Message::binary(data)));
884                                    }
885                                    _ => {
886                                        return Err(Error::new(
887                                            Kind::Protocol,
888                                            "Encounted fragmented control frame.",
889                                        ))
890                                    }
891                                }
892                            } else {
893                                return Err(Error::new(
894                                    Kind::Protocol,
895                                    "Unable to reconstruct fragmented message. No first frame.",
896                                ));
897                            }
898                        }
899                        _ => return Err(Error::new(Kind::Protocol, "Encountered invalid opcode.")),
900                    }
901                } else {
902                    if frame.is_control() {
903                        return Err(Error::new(
904                            Kind::Protocol,
905                            "Encounted fragmented control frame.",
906                        ));
907                    } else {
908                        trace!("Received non-final fragment frame {:?}", frame);
909                        if !self.settings.fragments_grow
910                            && self.settings.fragments_capacity == self.fragments.len()
911                        {
912                            return Err(Error::new(Kind::Capacity, "Exceeded max fragments."));
913                        } else {
914                            self.fragments.push_back(frame)
915                        }
916                    }
917                }
918            }
919        }
920        Ok(())
921    }
922
923    pub fn write(&mut self) -> Result<()> {
924        if self.socket.is_negotiating() {
925            trace!("Performing TLS negotiation on {}.", self.peer_addr());
926            try!(self.socket.clear_negotiating());
927            self.read()
928        } else {
929            let res = if self.state.is_connecting() {
930                trace!("Ready to write handshake to {}.", self.peer_addr());
931                self.write_handshake()
932            } else {
933                trace!("Ready to write messages to {}.", self.peer_addr());
934
935                // Start out assuming that this write will clear the whole buffer
936                self.events.remove(Ready::writable());
937
938                if let Some(len) = try!(self.socket.try_write_buf(&mut self.out_buffer)) {
939                    trace!("Wrote {} bytes to {}", len, self.peer_addr());
940                    let finished = len == 0
941                        || self.out_buffer.position() == self.out_buffer.get_ref().len() as u64;
942                    if finished {
943                        match self.state {
944                            // we are are a server that is closing and just wrote out our confirming
945                            // close frame, let's disconnect
946                            FinishedClose if self.is_server() => {
947                                return Ok(self.events = Ready::empty())
948                            }
949                            _ => (),
950                        }
951                    }
952                }
953
954                // Check if there is more to write so that the connection will be rescheduled
955                Ok(self.check_events())
956            };
957
958            if self.socket.is_negotiating() && res.is_ok() {
959                self.events.remove(Ready::writable());
960                self.events.insert(Ready::readable());
961            }
962            res
963        }
964    }
965
966    pub fn send_message(&mut self, msg: Message) -> Result<()> {
967        if self.state.is_closing() {
968            trace!(
969                "Connection is closing. Ignoring request to send message {:?} to {}.",
970                msg,
971                self.peer_addr()
972            );
973            return Ok(());
974        }
975
976        let opcode = msg.opcode();
977        trace!("Message opcode {:?}", opcode);
978        let data = msg.into_data();
979
980        if let Some(frame) = try!(self
981            .handler
982            .on_send_frame(Frame::message(data, opcode, true)))
983        {
984            if frame.payload().len() > self.settings.fragment_size {
985                trace!("Chunking at {:?}.", self.settings.fragment_size);
986                // note this copies the data, so it's actually somewhat expensive to fragment
987                let mut chunks = frame
988                    .payload()
989                    .chunks(self.settings.fragment_size)
990                    .peekable();
991                let chunk = chunks.next().expect("Unable to get initial chunk!");
992
993                let mut first = Frame::message(Vec::from(chunk), opcode, false);
994
995                // Match reserved bits from original to keep extension status intact
996                first.set_rsv1(frame.has_rsv1());
997                first.set_rsv2(frame.has_rsv2());
998                first.set_rsv3(frame.has_rsv3());
999
1000                try!(self.buffer_frame(first));
1001
1002                while let Some(chunk) = chunks.next() {
1003                    if let Some(_) = chunks.peek() {
1004                        try!(self.buffer_frame(Frame::message(
1005                            Vec::from(chunk),
1006                            OpCode::Continue,
1007                            false
1008                        )));
1009                    } else {
1010                        try!(self.buffer_frame(Frame::message(
1011                            Vec::from(chunk),
1012                            OpCode::Continue,
1013                            true
1014                        )));
1015                    }
1016                }
1017            } else {
1018                trace!("Sending unfragmented message frame.");
1019                // true means that the message is done
1020                try!(self.buffer_frame(frame));
1021            }
1022        }
1023        Ok(self.check_events())
1024    }
1025
1026    #[inline]
1027    pub fn send_ping(&mut self, data: Vec<u8>) -> Result<()> {
1028        if self.state.is_closing() {
1029            trace!(
1030                "Connection is closing. Ignoring request to send ping {:?} to {}.",
1031                data,
1032                self.peer_addr()
1033            );
1034            return Ok(());
1035        }
1036        trace!("Sending ping to {}.", self.peer_addr());
1037
1038        if let Some(frame) = try!(self.handler.on_send_frame(Frame::ping(data))) {
1039            try!(self.buffer_frame(frame));
1040        }
1041        Ok(self.check_events())
1042    }
1043
1044    #[inline]
1045    pub fn send_pong(&mut self, data: Vec<u8>) -> Result<()> {
1046        if self.state.is_closing() {
1047            trace!(
1048                "Connection is closing. Ignoring request to send pong {:?} to {}.",
1049                data,
1050                self.peer_addr()
1051            );
1052            return Ok(());
1053        }
1054        trace!("Sending pong to {}.", self.peer_addr());
1055
1056        if let Some(frame) = try!(self.handler.on_send_frame(Frame::pong(data))) {
1057            try!(self.buffer_frame(frame));
1058        }
1059        Ok(self.check_events())
1060    }
1061
1062    #[inline]
1063    pub fn send_close<R>(&mut self, code: CloseCode, reason: R) -> Result<()>
1064    where
1065        R: Borrow<str>,
1066    {
1067        match self.state {
1068            // We are responding to a close frame the other endpoint, when this frame goes out, we
1069            // are done.
1070            RespondingClose => self.state = FinishedClose,
1071            // Multiple close frames are being sent from our end, ignore the later frames
1072            AwaitingClose | FinishedClose => {
1073                trace!(
1074                    "Connection is already closing. Ignoring close {:?} -- {:?} to {}.",
1075                    code,
1076                    reason.borrow(),
1077                    self.peer_addr()
1078                );
1079                return Ok(self.check_events());
1080            }
1081            // We are initiating a closing handshake.
1082            Open => self.state = AwaitingClose,
1083            Connecting(_, _) => {
1084                debug_assert!(false, "Attempted to close connection while not yet open.")
1085            }
1086        }
1087
1088        trace!(
1089            "Sending close {:?} -- {:?} to {}.",
1090            code,
1091            reason.borrow(),
1092            self.peer_addr()
1093        );
1094
1095        if let Some(frame) = try!(self
1096            .handler
1097            .on_send_frame(Frame::close(code, reason.borrow())))
1098        {
1099            try!(self.buffer_frame(frame));
1100        }
1101
1102        trace!("Connection to {} is now closing.", self.peer_addr());
1103
1104        Ok(self.check_events())
1105    }
1106
1107    fn check_events(&mut self) {
1108        if !self.state.is_connecting() {
1109            self.events.insert(Ready::readable());
1110            if self.out_buffer.position() < self.out_buffer.get_ref().len() as u64 {
1111                self.events.insert(Ready::writable());
1112            }
1113        }
1114    }
1115
1116    fn buffer_frame(&mut self, mut frame: Frame) -> Result<()> {
1117        try!(self.check_buffer_out(&frame));
1118
1119        if self.is_client() {
1120            frame.set_mask();
1121        }
1122
1123        trace!("Buffering frame to {}:\n{}", self.peer_addr(), frame);
1124
1125        let pos = self.out_buffer.position();
1126        try!(self.out_buffer.seek(SeekFrom::End(0)));
1127        try!(frame.format(&mut self.out_buffer));
1128        try!(self.out_buffer.seek(SeekFrom::Start(pos)));
1129        Ok(())
1130    }
1131
1132    fn check_buffer_out(&mut self, frame: &Frame) -> Result<()> {
1133        if self.out_buffer.get_ref().capacity() <= self.out_buffer.get_ref().len() + frame.len() {
1134            // extend
1135            let mut new = Vec::with_capacity(self.out_buffer.get_ref().capacity());
1136            new.extend(&self.out_buffer.get_ref()[self.out_buffer.position() as usize..]);
1137            if new.len() == new.capacity() {
1138                if self.settings.out_buffer_grow {
1139                    new.reserve(self.settings.out_buffer_capacity)
1140                } else {
1141                    return Err(Error::new(
1142                        Kind::Capacity,
1143                        "Maxed out output buffer for connection.",
1144                    ));
1145                }
1146            }
1147            self.out_buffer = Cursor::new(new);
1148        }
1149        Ok(())
1150    }
1151
1152    fn buffer_in(&mut self) -> Result<Option<usize>> {
1153        trace!("Reading buffer for connection to {}.", self.peer_addr());
1154        if let Some(len) = try!(self.socket.try_read_buf(self.in_buffer.get_mut())) {
1155            trace!("Buffered {}.", len);
1156            if self.in_buffer.get_ref().len() == self.in_buffer.get_ref().capacity() {
1157                // extend
1158                let mut new = Vec::with_capacity(self.in_buffer.get_ref().capacity());
1159                new.extend(&self.in_buffer.get_ref()[self.in_buffer.position() as usize..]);
1160                if new.len() == new.capacity() {
1161                    if self.settings.in_buffer_grow {
1162                        if new.capacity() >= self.settings.max_in_buffer {
1163                            return Err(Error::new(
1164                                Kind::Capacity,
1165                                "Maxed out input buffer for connection.",
1166                            ));
1167                        }
1168                        new.reserve(self.settings.in_buffer_capacity);
1169                    } else {
1170                        return Err(Error::new(
1171                            Kind::Capacity,
1172                            "Maxed out input buffer for connection.",
1173                        ));
1174                    }
1175                }
1176                self.in_buffer = Cursor::new(new);
1177            }
1178            Ok(Some(len))
1179        } else {
1180            Ok(None)
1181        }
1182    }
1183}