websocket_simple/
connection.rs

1#[warn(dead_code)]
2
3use std::mem::replace;
4use std::mem::transmute;
5use std::borrow::Borrow;
6use std::io::{Write, Read, Cursor, Seek, SeekFrom};
7use std::net::SocketAddr;
8use std::net::TcpStream;
9use std::collections::VecDeque;
10use std::str::from_utf8;
11
12use time;
13use url;
14#[cfg(feature="ssl")]
15use openssl::ssl::SslStream;
16
17use message::Message;
18use handshake::{Handshake, Request, Response};
19use frame::Frame;
20use protocol::{CloseCode, OpCode};
21use result::{Result, Error, Kind};
22use handler::Handler;
23use stream::Stream;
24use Settings;
25
26use self::State::*;
27use self::Endpoint::*;
28
29#[derive(Debug)]
30pub enum State {
31    // Tcp connection accepted, waiting for handshake to complete
32    Connecting(Cursor<Vec<u8>>, Cursor<Vec<u8>>),
33    // Ready to send/receive messages
34    Open,
35    AwaitingClose,
36    RespondingClose,
37    FinishedClose,
38}
39
40/// A little more semantic than a boolean
41#[derive(Debug, Eq, PartialEq, Clone, Copy)]
42pub enum Endpoint {
43    /// Will mask outgoing frames
44    Client,
45    /// Won't mask outgoing frames
46    Server,
47}
48
49impl State {
50
51    #[inline]
52    pub fn is_connecting(&self) -> bool {
53        match *self {
54            State::Connecting(..) => true,
55            _ => false,
56        }
57    }
58
59    #[allow(dead_code)]
60    #[inline]
61    pub fn is_open(&self) -> bool {
62        match *self {
63            State::Open => true,
64            _ => false,
65        }
66    }
67
68    #[inline]
69    pub fn is_closing(&self) -> bool {
70        match *self {
71            State::AwaitingClose => true,
72            State::FinishedClose => true,
73            _ => false,
74        }
75    }
76}
77
78pub struct Connection<Handler>
79{
80    socket: Stream,
81    state: State,
82    endpoint: Endpoint,
83
84    fragments: VecDeque<Frame>,
85
86    in_buffer: Cursor<Vec<u8>>,
87    out_buffer: Cursor<Vec<u8>>,
88
89    cur_data: Cursor<Vec<u8>>,
90
91    addresses: Vec<SocketAddr>,
92
93    settings: Settings,
94    handler: Handler,
95
96    read_time: u64,
97
98    over: bool,
99}
100
101impl<H> Connection<H>
102    where H: Handler
103{
104    pub fn new(sock: TcpStream, handler: H, settings: Settings) -> Connection<H> {
105        Connection {
106            socket: Stream::tcp(sock),
107            state: Connecting(
108                Cursor::new(Vec::with_capacity(2048)),
109                Cursor::new(Vec::with_capacity(2048)),
110            ),
111            endpoint: Endpoint::Server,
112            fragments: VecDeque::with_capacity(settings.fragments_capacity),
113            in_buffer: Cursor::new(Vec::with_capacity(204800)),
114            out_buffer: Cursor::new(Vec::with_capacity(204800)),
115            cur_data: Cursor::new(Vec::with_capacity(204800)),
116            handler: handler,
117            addresses: Vec::new(),
118            settings: settings,
119            read_time: time::precise_time_ns() / 1000_000,
120            over: false,
121        }
122    }
123
124    pub fn as_server(&mut self) -> Result<()> {
125        Ok(())
126    }
127
128    pub fn as_client(&mut self, url: &url::Url, addrs: Vec<SocketAddr>) -> Result<()> {
129        if let Connecting(ref mut req, _) = self.state {
130            self.addresses = addrs;
131            self.endpoint = Endpoint::Client;
132            try!(self.handler.build_request(url)).format(req.get_mut())
133        } else {
134            Err(Error::new(
135                Kind::Internal,
136                "Tried to set connection to client while not connecting."))
137        }
138    }
139
140    #[cfg(feature="ssl")]
141    pub fn encrypt(&mut self) -> Result<()> {
142        let ssl_stream = match self.endpoint {
143            Server => try!(SslStream::accept(
144                try!(self.handler.build_ssl()),
145                try!(self.socket().try_clone()))),
146
147            Client => try!(SslStream::connect(
148                try!(self.handler.build_ssl()),
149                try!(self.socket().try_clone()))),
150        };
151
152        Ok(self.socket = Stream::tls(ssl_stream))
153    }
154
155    pub fn socket(&self) -> &TcpStream {
156        &self.socket.evented()
157    }
158
159    fn peer_addr(&self) -> String {
160        if let Ok(addr) = self.socket.peer_addr() {
161            addr.to_string()
162        } else {
163            "UNKNOWN".into()
164        }
165    }
166
167    // Resetting may be necessary in order to try all possible addresses for a server
168    #[cfg(feature="ssl")]
169    pub fn reset(&mut self) -> Result<()> {
170        if self.is_client() {
171            if let Connecting(ref mut req, ref mut res) = self.state {
172                req.set_position(0);
173                res.set_position(0);
174
175                if let Some(ref addr) = self.addresses.pop() {
176                    let sock = try!(TcpStream::connect(addr));
177                    if self.socket.is_tls() {
178                        Ok(self.socket = Stream::tls(
179                                try!(SslStream::connect(
180                                    try!(self.handler.build_ssl()),
181                                    sock))))
182
183                    } else {
184                        Ok(self.socket = Stream::tcp(sock))
185                    }
186                } else {
187                    if self.settings.panic_on_new_connection {
188                        panic!("Unable to connect to server.");
189                    }
190                    Err(Error::new(Kind::Internal, "Exhausted possible addresses."))
191                }
192            } else {
193                Err(Error::new(Kind::Internal, "Unable to reset client connection because it is active."))
194            }
195        } else {
196            Err(Error::new(Kind::Internal, "Server connections cannot be reset."))
197        }
198    }
199
200    #[cfg(not(feature="ssl"))]
201    pub fn reset(&mut self) -> Result<()> {
202        if self.is_client() {
203            if let Connecting(ref mut req, ref mut res) = self.state {
204                req.set_position(0);
205                res.set_position(0);
206
207                if let Some(ref addr) = self.addresses.pop() {
208                    let sock = try!(TcpStream::connect(addr));
209                    Ok(self.socket = Stream::tcp(sock))
210                } else {
211                    if self.settings.panic_on_new_connection {
212                        panic!("Unable to connect to server.");
213                    }
214                    Err(Error::new(Kind::Internal, "Exhausted possible addresses."))
215                }
216            } else {
217                Err(Error::new(Kind::Internal, "Unable to reset client connection because it is active."))
218            }
219        } else {
220            Err(Error::new(Kind::Internal, "Server connections cannot be reset."))
221        }
222    }
223
224    pub fn set_read_time(&mut self, read_time: u64) {
225        self.read_time = read_time
226    }
227
228    pub fn get_read_time(&self) -> u64 {
229        self.read_time
230    }
231
232    pub fn is_over(&self) -> bool {
233        match self.state {
234            State::FinishedClose => return true,
235            _ => (),
236        }
237        self.over
238    }
239
240    pub fn is_closing(&self) -> bool {
241        self.state.is_closing()
242    }
243
244    pub fn is_client(&self) -> bool {
245        match self.endpoint {
246            Client => true,
247            Server => false,
248        }
249    }
250
251    pub fn is_server(&self) -> bool {
252        match self.endpoint {
253            Client => false,
254            Server => true,
255        }
256    }
257
258    pub fn shutdown(&mut self) {
259        self.handler.on_shutdown();
260        if let Err(err) = self.send_close(CloseCode::Away, "Shutting down.") {
261            self.handler.on_error(err);
262            self.disconnect()
263        }
264    }
265
266    pub fn error(&mut self, err: Error) {
267        match self.state {
268            Connecting(_, ref mut res) => {
269                match err.kind {
270                    #[cfg(feature="ssl")]
271                    Kind::Ssl(_) | Kind::Io(_) => {
272                        self.handler.on_error(err);
273                        self.over = true
274                    }
275                    Kind::Protocol => {
276                        let msg = err.to_string();
277                        self.handler.on_error(err);
278                        if let Server = self.endpoint {
279                            res.get_mut().clear();
280
281                            if let Err(err) = write!(
282                                    res.get_mut(),
283                                    "HTTP/1.1 400 Bad Request\r\n\r\n{}", msg)
284                            {
285                                self.handler.on_error(Error::from(err));
286                            }
287                        }
288                        self.over = true
289                    }
290                    _ => {
291                        let msg = err.to_string();
292                        self.handler.on_error(err);
293                        if let Server = self.endpoint {
294                            res.get_mut().clear();
295                            if let Err(err) = write!(
296                                    res.get_mut(),
297                                    "HTTP/1.1 500 Internal Server Error\r\n\r\n{}", msg) {
298                                self.handler.on_error(Error::from(err));
299                            }
300                        }
301                        self.over = true
302                    }
303                }
304
305            }
306            _ => {
307                match err.kind {
308                    Kind::Internal => {
309                        if self.settings.panic_on_internal {
310                            panic!("Panicking on internal error -- {}", err);
311                        }
312                        let reason = format!("{}", err);
313
314                        self.handler.on_error(err);
315                        if let Err(err) = self.send_close(CloseCode::Error, reason) {
316                            self.handler.on_error(err);
317                            self.disconnect()
318                        }
319                    }
320                    Kind::Capacity => {
321                        if self.settings.panic_on_capacity {
322                            panic!("Panicking on capacity error -- {}", err);
323                        }
324                        let reason = format!("{}", err);
325
326                        self.handler.on_error(err);
327                        if let Err(err) = self.send_close(CloseCode::Size, reason) {
328                            self.handler.on_error(err);
329                            self.disconnect()
330                        }
331                    }
332                    Kind::Protocol => {
333                        if self.settings.panic_on_protocol {
334                            panic!("Panicking on protocol error -- {}", err);
335                        }
336                        let reason = format!("{}", err);
337
338                        self.handler.on_error(err);
339                        if let Err(err) = self.send_close(CloseCode::Protocol, reason) {
340                            self.handler.on_error(err);
341                            self.disconnect()
342                        }
343                    }
344                    Kind::Encoding(_) => {
345                        if self.settings.panic_on_encoding {
346                            panic!("Panicking on encoding error -- {}", err);
347                        }
348                        let reason = format!("{}", err);
349
350                        self.handler.on_error(err);
351                        if let Err(err) = self.send_close(CloseCode::Invalid, reason) {
352                            self.handler.on_error(err);
353                            self.disconnect()
354                        }
355                    }
356                    Kind::Http(_) => {
357                        // This may happen if some handler writes a bad response
358                        self.handler.on_error(err);
359                        error!("Disconnecting WebSocket.");
360                        self.disconnect()
361                    }
362                    Kind::Custom(_) => {
363                        self.handler.on_error(err);
364                    }
365                    _ => {
366                        if self.settings.panic_on_io {
367                            panic!("Panicking on io error -- {}", err);
368                        }
369                        self.handler.on_error(err);
370                        self.disconnect()
371                    }
372                }
373            }
374        }
375    }
376
377    pub fn disconnect(&mut self) {
378        match self.state {
379            AwaitingClose | RespondingClose | FinishedClose | Connecting(_, _)=> (),
380            _ => {
381                self.handler.on_close(CloseCode::Abnormal, "");
382            }
383        }
384        self.over = true;
385    }
386
387    pub fn consume(self) -> H {
388        self.handler
389    }
390
391    fn write_handshake(&mut self) -> Result<()> {
392        if let Connecting(ref mut req, ref mut res) = self.state {
393            match self.endpoint {
394                Server => {
395                    let _ = try!(self.socket.do_write_buf(res));
396                }
397                Client =>  {
398                    let _ = try!(self.socket.do_write_buf(req));
399                    trace!("Finished writing handshake request to {}",
400                        self.socket
401                            .peer_addr()
402                            .map(|addr| addr.to_string())
403                            .unwrap_or("UNKNOWN".into()));
404                    return Ok(())
405                }
406            }
407        }
408
409        if let Connecting(ref req, ref res) = replace(&mut self.state, Open) {
410            trace!("Finished writing handshake response to {}", self.peer_addr());
411
412            let request = match Request::parse(req.get_ref()) {
413                Ok(Some(req)) => req,
414                _ => {
415                    // An error should already have been sent for the first time it failed to
416                    // parse. We don't call disconnect here because `on_open` hasn't been called yet.
417                    self.state = FinishedClose;
418                    self.over = true;
419                    return Ok(())
420                }
421            };
422
423            let response = try!(try!(Response::parse(res.get_ref())).ok_or(
424                Error::new(Kind::Internal, "Failed to parse response after handshake is complete.")));
425
426            if response.status() != 101 {
427                self.over = true;
428                return Ok(())
429            } else {
430                try!(self.handler.on_open(Handshake {
431                    request: request,
432                    response: response,
433                    peer_addr: self.socket.peer_addr().ok(),
434                    local_addr: self.socket.local_addr().ok(),
435                }));
436                debug!("Connection to {} is now open.", self.peer_addr());
437                return self.check_events()
438            }
439        } else {
440            Err(Error::new(Kind::Internal, "Tried to write WebSocket handshake while not in connecting state!"))
441        }
442    }
443
444    pub fn new_data_received(&mut self, data: &[u8]) {
445        let _ = self.cur_data.write(data);
446    }
447
448    fn do_read_buf(cur_data: &mut Cursor<Vec<u8>>, val: &mut Cursor<Vec<u8>>) -> Result<usize> {
449        let position = val.position();
450        let len = {
451            let data = cur_data.get_mut();
452            let len = data.len();
453            let _ = try!(val.write(&data[..]));
454            data.clear();
455            len
456        };
457        cur_data.set_position(0);
458        val.set_position(position);
459        Ok(len)
460    }
461
462    fn read_handshake(&mut self) -> Result<()> {
463        let mut is_need_write = false;
464        let mut is_need_return = false;
465        if let Connecting(ref mut req, ref mut res) = self.state {
466            match self.endpoint {
467                Server => {
468                    let len = try!(Self::do_read_buf(&mut self.cur_data, req));
469                    if len == 0 {
470                        return Err(Error::new(Kind::CloseSingal, "Connect Read Empty Size"));
471                    }
472                    if let Some(ref request) = try!(Request::parse(req.get_ref())) {
473                        trace!("Handshake request received: \n{}", request);
474                        let response = try!(self.handler.on_request(request));
475                        try!(response.format(res.get_mut()));
476                        is_need_write = true;
477                    }
478                    is_need_return = true;
479                }
480                Client => {
481                    let len = try!(Self::do_read_buf(&mut self.cur_data, res));
482                    if len == 0 {
483                        return Err(Error::new(Kind::CloseSingal, "Connect Read Empty Size"));
484                    }
485                    // TODO: see if this can be optimized with drain
486                    let end = {
487                        let data = res.get_ref();
488                        let end = data.iter()
489                                      .enumerate()
490                                      .take_while(|&(ind, _)| !data[..ind].ends_with(b"\r\n\r\n"))
491                                      .count();
492                        if !data[..end].ends_with(b"\r\n\r\n") {
493                            return Ok(())
494                        }
495                        self.in_buffer.get_mut().extend(&data[end..]);
496                        end
497                    };
498                    res.get_mut().truncate(end);
499                }
500            }
501        }
502
503        if is_need_write {
504            try!(self.write());
505        }
506
507        if is_need_return {
508            return Ok(());
509        }
510
511        if let Connecting(ref req, ref res) = replace(&mut self.state, Open) {
512            trace!("Finished reading handshake response from {}", self.peer_addr());
513
514            let request = try!(try!(Request::parse(req.get_ref())).ok_or(
515                Error::new(Kind::Internal, "Failed to parse request after handshake is complete.")));
516
517            let response = try!(try!(Response::parse(res.get_ref())).ok_or(
518                Error::new(Kind::Internal, "Failed to parse response after handshake is complete.")));
519
520            trace!("Handshake response received: \n{}", response);
521
522            if response.status() != 101 {
523                if response.status() != 301 && response.status() != 302 {
524                    return Err(Error::new(Kind::Protocol, "Handshake failed."));
525                } else {
526                    return Ok(())
527                }
528            }
529
530            if self.settings.key_strict {
531                let req_key = try!(request.hashed_key());
532                let res_key = try!(from_utf8(try!(response.key())));
533                if req_key != res_key {
534                    return Err(Error::new(Kind::Protocol, format!("Received incorrect WebSocket Accept key: {} vs {}", req_key, res_key)));
535                }
536            }
537
538            try!(self.handler.on_response(&response));
539            try!(self.handler.on_open(Handshake {
540                    request: request,
541                    response: response,
542                    peer_addr: self.socket.peer_addr().ok(),
543                    local_addr: self.socket.local_addr().ok(),
544            }));
545
546            // check to see if there is anything to read already
547            if !self.in_buffer.get_ref().is_empty() {
548                try!(self.read_frames());
549            }
550
551            return self.check_events()
552        }
553        Err(Error::new(Kind::Internal, "Tried to read WebSocket handshake while not in connecting state!"))
554    }
555
556    pub fn read(&mut self) -> Result<()> {
557        if self.socket.is_negotiating() {
558            trace!("Performing TLS negotiation on {}.", self.peer_addr());
559            try!(self.socket.clear_negotiating());
560            self.write()
561        } else {
562            let res = if self.state.is_connecting() {
563                trace!("Ready to read handshake from {}.", self.peer_addr());
564                self.read_handshake()
565            } else {
566                trace!("Ready to read messages from {}.", self.peer_addr());
567                if let Some(_) = try!(self.buffer_in()) {
568                     // consume the whole buffer if possible
569                    if let Err(err) = self.read_frames() {
570                        // break on first IO error, other errors don't imply that the buffer is bad
571                        // has error frame so will close the io.
572                        // return Err(err);
573                        if let Kind::Io(_) = err.kind {
574                            return Err(err)
575                        }
576                        self.error(err)
577                    }
578                }
579                Ok(())
580            };
581
582            if self.socket.is_negotiating() && res.is_ok() {
583            }
584            res
585        }
586    }
587
588    fn read_frames(&mut self) -> Result<()> {
589        while let Some(mut frame) = try!(Frame::parse(&mut self.in_buffer)) {
590            match self.state {
591                // Ignore data received after receiving close frame
592                RespondingClose | FinishedClose => continue,
593                _ => (),
594            }
595
596            if self.settings.masking_strict {
597                if frame.is_masked() {
598                    if self.is_client() {
599                        return Err(Error::new(Kind::Protocol, "Received masked frame from a server endpoint."))
600                    }
601                } else {
602                    if self.is_server() {
603                        return Err(Error::new(Kind::Protocol, "Received unmasked frame from a client endpoint."))
604                    }
605                }
606            }
607
608            // This is safe whether or not a frame is masked.
609            frame.remove_mask();
610
611            if let Some(frame) = try!(self.handler.on_frame(frame)) {
612                if frame.is_final() {
613                    match frame.opcode() {
614                        // singleton data frames
615                        OpCode::Text => {
616                            //trace!("Received text frame {:?}", frame);
617                            // since we are going to handle this, there can't be an ongoing
618                            // message
619                            if !self.fragments.is_empty() {
620                                return Err(Error::new(Kind::Protocol, "Received unfragmented text frame while processing fragmented message."))
621                            }
622                            let msg = Message::text(try!(String::from_utf8(frame.into_data()).map_err(|err| err.utf8_error())));
623                            try!(self.handler.on_message(msg));
624                        }
625                        OpCode::Binary => {
626                            //trace!("Received binary frame {:?}", frame);
627                            // since we are going to handle this, there can't be an ongoing
628                            // message
629                            if !self.fragments.is_empty() {
630                                return Err(Error::new(Kind::Protocol, "Received unfragmented binary frame while processing fragmented message."))
631                            }
632                            let data = frame.into_data();
633                            try!(self.handler.on_message(Message::binary(data)));
634                        }
635                        // control frames
636                        OpCode::Close => {
637                            //trace!("Received close frame {:?}", frame);
638                            // Closing handshake
639                            if self.state.is_closing() {
640
641                                if self.is_server() {
642                                    // Finished handshake, disconnect server side
643                                    self.over = true;
644                                } else {
645                                    // We are a client, so we wait for the server to close the
646                                    // connection
647                                }
648
649                            } else {
650
651                                // Starting handshake, will send the responding close frame
652                                self.state = RespondingClose;
653
654                            }
655
656                            let mut close_code = [0u8; 2];
657                            let mut data = Cursor::new(frame.into_data());
658                            if let 2 = try!(data.read(&mut close_code)) {
659                                let code_be: u16 = unsafe {transmute(close_code) };
660                                trace!("Connection to {} received raw close code: {:?}, {:b}", self.peer_addr(), code_be, code_be);
661                                let named = CloseCode::from(u16::from_be(code_be));
662                                if let CloseCode::Other(code) = named {
663                                    if
664                                            code < 1000 ||
665                                            code >= 5000 ||
666                                            code == 1004 ||
667                                            code == 1014 ||
668                                            code == 1016 || // these below are here to pass the autobahn test suite
669                                            code == 1100 || // we shouldn't need them later
670                                            code == 2000 ||
671                                            code == 2999
672                                    {
673                                        return Err(Error::new(Kind::Protocol, format!("Received invalid close code from endpoint: {}", code)))
674                                    }
675                                }
676                                let has_reason = {
677                                    if let Ok(reason) = from_utf8(&data.get_ref()[2..]) {
678                                        self.handler.on_close(named, reason); // note reason may be an empty string
679                                        true
680                                    } else {
681                                        self.handler.on_close(named, "");
682                                        false
683                                    }
684                                };
685
686                                if let CloseCode::Abnormal = named {
687                                    return Err(Error::new(Kind::Protocol, "Received abnormal close code from endpoint."))
688                                } else if let CloseCode::Status = named {
689                                    return Err(Error::new(Kind::Protocol, "Received no status close code from endpoint."))
690                                } else if let CloseCode::Restart = named {
691                                    return Err(Error::new(Kind::Protocol, "Restart close code is not supported."))
692                                } else if let CloseCode::Again = named {
693                                    return Err(Error::new(Kind::Protocol, "Try again later close code is not supported."))
694                                } else if let CloseCode::Tls = named {
695                                    return Err(Error::new(Kind::Protocol, "Received TLS close code outside of TLS handshake."))
696                                } else {
697                                    if !self.state.is_closing() {
698                                        if has_reason {
699                                            try!(self.send_close(named, "")); // note this drops any extra close data
700                                        } else {
701                                            try!(self.send_close(CloseCode::Invalid, ""));
702                                        }
703                                    }
704                                }
705                            } else {
706                                // This is not an error. It is allowed behavior in the
707                                // protocol, so we don't trigger an error
708                                self.handler.on_close(CloseCode::Status, "Unable to read close code. Sending empty close frame.");
709                                if !self.state.is_closing() {
710                                    try!(self.send_close(CloseCode::Empty, ""));
711                                }
712                            }
713                        }
714                        OpCode::Ping => {
715                            //trace!("Received ping frame {:?}", frame);
716                            try!(self.send_pong(frame.into_data()));
717                        }
718                        OpCode::Pong => {
719                            //trace!("Received pong frame {:?}", frame);
720                            // no ping validation for now
721                        }
722                        // last fragment
723                        OpCode::Continue => {
724                            //trace!("Received final fragment {:?}", frame);
725                            if let Some(first) = self.fragments.pop_front() {
726                                let size = self.fragments.iter().fold(first.payload().len() + frame.payload().len(), |len, frame| len + frame.payload().len());
727                                match first.opcode() {
728                                    OpCode::Text => {
729                                        trace!("Constructing text message from fragments: {:?} -> {:?} -> {:?}", first, self.fragments.iter().collect::<Vec<&Frame>>(), frame);
730                                        let mut data = Vec::with_capacity(size);
731                                        data.extend(first.into_data());
732                                        while let Some(frame) = self.fragments.pop_front() {
733                                            data.extend(frame.into_data());
734                                        }
735                                        data.extend(frame.into_data());
736
737                                        let string = try!(String::from_utf8(data).map_err(|err| err.utf8_error()));
738
739                                        trace!("Calling handler with constructed message: {:?}", string);
740                                        try!(self.handler.on_message(Message::text(string)));
741                                    }
742                                    OpCode::Binary => {
743                                        trace!("Constructing text message from fragments: {:?} -> {:?} -> {:?}", first, self.fragments.iter().collect::<Vec<&Frame>>(), frame);
744                                        let mut data = Vec::with_capacity(size);
745                                        data.extend(first.into_data());
746
747                                        while let Some(frame) = self.fragments.pop_front() {
748                                            data.extend(frame.into_data());
749                                        }
750
751                                        data.extend(frame.into_data());
752
753                                        trace!("Calling handler with constructed message: {:?}", data);
754                                        try!(self.handler.on_message(Message::binary(data)));
755                                    }
756                                    _ => {
757                                        return Err(Error::new(Kind::Protocol, "Encounted fragmented control frame."))
758                                    }
759                                }
760                            } else {
761                                return Err(Error::new(Kind::Protocol, "Unable to reconstruct fragmented message. No first frame."))
762                            }
763                        }
764                        _ => return Err(Error::new(Kind::Protocol, "Encountered invalid opcode.")),
765                    }
766                } else {
767                    if frame.is_control() {
768                        return Err(Error::new(Kind::Protocol, "Encounted fragmented control frame."))
769                    } else {
770                        //trace!("Received non-final fragment frame {:?}", frame);
771                        if !self.settings.fragments_grow &&
772                            self.settings.fragments_capacity == self.fragments.len()
773                        {
774                            return Err(Error::new(Kind::Capacity, "Exceeded max fragments."))
775                        } else {
776                            self.fragments.push_back(frame)
777                        }
778                    }
779                }
780            }
781        }
782        Ok(())
783    }
784
785    pub fn write(&mut self) -> Result<()> {
786        if self.socket.is_negotiating() {
787            trace!("Performing TLS negotiation on {}.", self.peer_addr());
788            try!(self.socket.clear_negotiating());
789            self.read()
790        } else {
791            let res = if self.state.is_connecting() {
792                trace!("Ready to write handshake to {}.", self.peer_addr());
793                self.write_handshake()
794            } else {
795                trace!("Ready to write messages to {}.", self.peer_addr());
796
797                // Start out assuming that this write will clear the whole buffer
798
799                let len = try!(self.socket.do_write_buf(&mut self.out_buffer));
800                trace!("Wrote {} bytes to {}", len, self.peer_addr());
801                let finished = len == 0 || self.out_buffer.position() == self.out_buffer.get_ref().len() as u64;
802                if finished {
803                    match self.state {
804                        // we are are a server that is closing and just wrote out our confirming
805                        // close frame, let's disconnect
806                        FinishedClose if self.is_server()  => return Ok(self.over = true),
807                        _ => (),
808                    }
809                }
810
811                self.out_buffer.get_mut().clear();
812                self.out_buffer.set_position(0);
813
814                // Check if there is more to write so that the connection will be rescheduled
815                self.check_events()
816            };
817
818            if self.socket.is_negotiating() && res.is_ok() {
819            }
820            res
821        }
822    }
823
824    pub fn send_message(&mut self, msg: Message) -> Result<()> {
825        if self.state.is_closing() {
826            trace!("Connection is closing. Ignoring request to send message {:?} to {}.",
827                msg,
828                self.peer_addr());
829            return Ok(())
830        }
831
832        let opcode = msg.opcode();
833        trace!("Message opcode {:?}", opcode);
834        let data = msg.into_data();
835
836        if let Some(frame) = try!(self.handler.on_send_frame(Frame::message(data, opcode, true))) {
837
838            if frame.payload().len() > self.settings.fragment_size {
839                trace!("Chunking at {:?}.", self.settings.fragment_size);
840                // note this copies the data, so it's actually somewhat expensive to fragment
841                let mut chunks = frame.payload().chunks(self.settings.fragment_size).peekable();
842                let chunk = chunks.next().expect("Unable to get initial chunk!");
843
844                let mut first = Frame::message(Vec::from(chunk), opcode, false);
845
846                // Match reserved bits from original to keep extension status intact
847                first.set_rsv1(frame.has_rsv1());
848                first.set_rsv2(frame.has_rsv2());
849                first.set_rsv3(frame.has_rsv3());
850
851                try!(self.buffer_frame(first));
852
853                while let Some(chunk) = chunks.next() {
854                    if let Some(_) = chunks.peek() {
855                        try!(self.buffer_frame(
856                            Frame::message(Vec::from(chunk), OpCode::Continue, false)));
857                    } else {
858                        try!(self.buffer_frame(
859                            Frame::message(Vec::from(chunk), OpCode::Continue, true)));
860                    }
861                }
862
863            } else {
864                trace!("Sending unfragmented message frame.");
865                // true means that the message is done
866                try!(self.buffer_frame(frame));
867            }
868
869        }
870        self.check_events()
871    }
872
873    #[inline]
874    pub fn send_ping(&mut self, data: Vec<u8>) -> Result<()> {
875        if self.state.is_closing() {
876            trace!("Connection is closing. Ignoring request to send ping {:?} to {}.",
877                data,
878                self.peer_addr());
879            return Ok(())
880        }
881        trace!("Sending ping to {}.", self.peer_addr());
882
883        if let Some(frame) = try!(self.handler.on_send_frame(Frame::ping(data))) {
884            try!(self.buffer_frame(frame));
885        }
886        self.check_events()
887    }
888
889    #[inline]
890    pub fn send_pong(&mut self, data: Vec<u8>) -> Result<()> {
891        if self.state.is_closing() {
892            trace!("Connection is closing. Ignoring request to send pong {:?} to {}.",
893                data,
894                self.peer_addr());
895            return Ok(())
896        }
897        trace!("Sending pong to {}.", self.peer_addr());
898
899        if let Some(frame) = try!(self.handler.on_send_frame(Frame::pong(data))) {
900            try!(self.buffer_frame(frame));
901        }
902        self.check_events()
903    }
904
905    #[inline]
906    pub fn send_close<R>(&mut self, code: CloseCode, reason: R) -> Result<()>
907        where R: Borrow<str>
908    {
909        match self.state {
910            // We are responding to a close frame the other endpoint, when this frame goes out, we
911            // are done.
912            RespondingClose => self.state = FinishedClose,
913            // Multiple close frames are being sent from our end, ignore the later frames
914            AwaitingClose | FinishedClose => {
915                trace!("Connection is already closing. Ignoring close {:?} -- {:?} to {}.",
916                    code,
917                    reason.borrow(),
918                    self.peer_addr());
919                return self.check_events()
920            }
921            // We are initiating a closing handshake.
922            Open => self.state = AwaitingClose,
923            Connecting(_, _) => {
924                self.disconnect();
925                return self.check_events();
926                // debug_assert!(false, "Attempted to close connection while not yet open.")
927            }
928        }
929
930        trace!("Sending close {:?} -- {:?} to {}.", code, reason.borrow(), self.peer_addr());
931
932        if let Some(frame) = try!(self.handler.on_send_frame(Frame::close(code, reason.borrow()))) {
933            try!(self.buffer_frame(frame));
934        }
935
936        trace!("Connection to {} is now closing.", self.peer_addr());
937
938        self.check_events()
939    }
940
941    fn check_events(&mut self) -> Result<()> {
942        if !self.state.is_connecting() {
943            if self.out_buffer.position() < self.out_buffer.get_ref().len() as u64 {
944                return self.write();
945            }
946        }
947        Ok(())
948    }
949
950    fn buffer_frame(&mut self, mut frame: Frame) -> Result<()> {
951        try!(self.check_buffer_out(&frame));
952
953        if self.is_client() {
954            frame.set_mask();
955        }
956
957        // trace!("Buffering frame to {}:\n{}", self.peer_addr(), frame);
958
959        let pos = self.out_buffer.position();
960        try!(self.out_buffer.seek(SeekFrom::End(0)));
961        try!(frame.format(&mut self.out_buffer));
962        try!(self.out_buffer.seek(SeekFrom::Start(pos)));
963        try!(self.write());
964        Ok(())
965    }
966
967    fn check_buffer_out(&mut self, frame: &Frame) -> Result<()> {
968
969        if self.out_buffer.get_ref().capacity() <= self.out_buffer.get_ref().len() + frame.len() {
970            // extend
971            let mut new = Vec::with_capacity(self.out_buffer.get_ref().capacity());
972            new.extend(&self.out_buffer.get_ref()[self.out_buffer.position() as usize ..]);
973            if new.len() == new.capacity() {
974                if self.settings.out_buffer_grow {
975                    new.reserve(self.settings.out_buffer_capacity)
976                } else {
977                    return Err(Error::new(Kind::Capacity, "Maxed out output buffer for connection."))
978                }
979            }
980            self.out_buffer = Cursor::new(new);
981        }
982        Ok(())
983    }
984
985    fn buffer_in(&mut self) -> Result<Option<usize>> {
986        let len = try!(Self::do_read_buf(&mut self.cur_data, &mut self.in_buffer));
987        if len == 0 {
988            return Err(Error::new(Kind::CloseSingal, "Message Read Empty Size"));
989        }
990
991        if self.in_buffer.get_ref().len() == self.in_buffer.get_ref().capacity() {
992            // extend
993            let mut new = Vec::with_capacity(self.in_buffer.get_ref().capacity());
994            new.extend(&self.in_buffer.get_ref()[self.in_buffer.position() as usize ..]);
995            if new.len() == new.capacity() {
996                if !self.settings.in_buffer_grow {
997                    return Err(Error::new(Kind::Capacity, "Maxed out input buffer for connection."))
998                }
999                if new.capacity() == self.settings.in_buffer_capacity {
1000                    return Err(Error::new(Kind::Capacity, "Maxed out input buffer for connection."))
1001                }
1002                let now_len = new.len();
1003                new.reserve(::std::cmp::min(self.settings.in_buffer_capacity, now_len * 2));
1004                self.in_buffer = Cursor::new(new);
1005                // return now so that hopefully we will consume some of the buffer so this
1006                // won't happen next time
1007                trace!("Buffered {}.", len);
1008            } else {
1009                self.in_buffer = Cursor::new(new);
1010            }
1011        }
1012
1013        Ok(Some(len))
1014    }
1015}