1use std::borrow::Borrow;
2use std::collections::VecDeque;
3use std::io::{Cursor, Read, Seek, SeekFrom, Write};
4use std::mem::replace;
5use std::net::SocketAddr;
6use std::str::from_utf8;
7
8use mio::tcp::TcpStream;
9use mio::timer::Timeout;
10use mio::{Ready, Token};
11use url;
12
13#[cfg(feature = "ssl")]
14use openssl::ssl::HandshakeError;
15
16use frame::Frame;
17use handler::Handler;
18use handshake::{Handshake, Request, Response};
19use message::Message;
20use protocol::{CloseCode, OpCode};
21use result::{Error, Kind, Result};
22use stream::{Stream, TryReadBuf, TryWriteBuf};
23
24use self::Endpoint::*;
25use self::State::*;
26
27use super::Settings;
28
29#[derive(Debug)]
30pub enum State {
31 Connecting(Cursor<Vec<u8>>, Cursor<Vec<u8>>),
33 Open,
35 AwaitingClose,
36 RespondingClose,
37 FinishedClose,
38}
39
40#[derive(Debug, Eq, PartialEq, Clone)]
42pub enum Endpoint {
43 Client(url::Url),
45 Server,
47}
48
49impl State {
50 #[inline]
51 pub fn is_connecting(&self) -> bool {
52 match *self {
53 State::Connecting(..) => true,
54 _ => false,
55 }
56 }
57
58 #[allow(dead_code)]
59 #[inline]
60 pub fn is_open(&self) -> bool {
61 match *self {
62 State::Open => true,
63 _ => false,
64 }
65 }
66
67 #[inline]
68 pub fn is_closing(&self) -> bool {
69 match *self {
70 State::AwaitingClose => true,
71 State::FinishedClose => true,
72 _ => false,
73 }
74 }
75}
76
77pub struct Connection<H>
78where
79 H: Handler,
80{
81 token: Token,
82 socket: Stream,
83 state: State,
84 endpoint: Endpoint,
85 events: Ready,
86
87 fragments: VecDeque<Frame>,
88
89 in_buffer: Cursor<Vec<u8>>,
90 out_buffer: Cursor<Vec<u8>>,
91
92 handler: H,
93
94 addresses: Vec<SocketAddr>,
95
96 settings: Settings,
97 connection_id: u32,
98}
99
100impl<H> Connection<H>
101where
102 H: Handler,
103{
104 pub fn new(
105 tok: Token,
106 sock: TcpStream,
107 handler: H,
108 settings: Settings,
109 connection_id: u32,
110 ) -> Connection<H> {
111 Connection {
112 token: tok,
113 socket: Stream::tcp(sock),
114 state: Connecting(
115 Cursor::new(Vec::with_capacity(2048)),
116 Cursor::new(Vec::with_capacity(2048)),
117 ),
118 endpoint: Endpoint::Server,
119 events: Ready::empty(),
120 fragments: VecDeque::with_capacity(settings.fragments_capacity),
121 in_buffer: Cursor::new(Vec::with_capacity(settings.in_buffer_capacity)),
122 out_buffer: Cursor::new(Vec::with_capacity(settings.out_buffer_capacity)),
123 handler: handler,
124 addresses: Vec::new(),
125 settings: settings,
126 connection_id: connection_id,
127 }
128 }
129
130 pub fn as_server(&mut self) -> Result<()> {
131 Ok(self.events.insert(Ready::readable()))
132 }
133
134 pub fn as_client(&mut self, url: url::Url, addrs: Vec<SocketAddr>) -> Result<()> {
135 if let Connecting(ref mut req_buf, _) = self.state {
136 let req = self.handler.build_request(&url)?;
137 self.addresses = addrs;
138 self.events.insert(Ready::writable());
139 self.endpoint = Endpoint::Client(url);
140 req.format(req_buf.get_mut())
141 } else {
142 Err(Error::new(
143 Kind::Internal,
144 "Tried to set connection to client while not connecting.",
145 ))
146 }
147 }
148
149 #[cfg(feature = "ssl")]
150 pub fn encrypt(&mut self) -> Result<()> {
151 let sock = try!(self.socket().try_clone());
152 let ssl_stream = match self.endpoint {
153 Server => self.handler.upgrade_ssl_server(sock),
154 Client(ref url) => self.handler.upgrade_ssl_client(sock, url),
155 };
156
157 match ssl_stream {
158 Ok(stream) => Ok(self.socket = Stream::tls_live(stream)),
159 Err(Error {
160 kind: Kind::SslHandshake(handshake_err),
161 details,
162 }) => match handshake_err {
163 HandshakeError::SetupFailure(_) => {
164 Err(Error::new(Kind::SslHandshake(handshake_err), details))
165 }
166 HandshakeError::Failure(mid) | HandshakeError::Interrupted(mid) => {
167 Ok(self.socket = Stream::tls(mid))
168 }
169 },
170 Err(e) => Err(e),
171 }
172 }
173
174 pub fn token(&self) -> Token {
175 self.token
176 }
177
178 pub fn socket(&self) -> &TcpStream {
179 self.socket.evented()
180 }
181
182 pub fn connection_id(&self) -> u32 {
183 self.connection_id
184 }
185
186 fn peer_addr(&self) -> String {
187 if let Ok(addr) = self.socket.peer_addr() {
188 addr.to_string()
189 } else {
190 "UNKNOWN".into()
191 }
192 }
193
194 #[cfg(feature = "ssl")]
196 pub fn reset(&mut self) -> Result<()> {
197 if let Client(ref url) = self.endpoint {
199 if let Connecting(ref mut req, ref mut res) = self.state {
200 req.set_position(0);
201 res.set_position(0);
202 self.events.remove(Ready::readable());
203 self.events.insert(Ready::writable());
204
205 if let Some(ref addr) = self.addresses.pop() {
206 let sock = try!(TcpStream::connect(addr));
207 if self.socket.is_tls() {
208 let ssl_stream = self.handler.upgrade_ssl_client(sock, url);
209 match ssl_stream {
210 Ok(stream) => Ok(self.socket = Stream::tls_live(stream)),
211 Err(Error {
212 kind: Kind::SslHandshake(handshake_err),
213 details,
214 }) => match handshake_err {
215 HandshakeError::SetupFailure(_) => {
216 Err(Error::new(Kind::SslHandshake(handshake_err), details))
217 }
218 HandshakeError::Failure(mid) | HandshakeError::Interrupted(mid) => {
219 Ok(self.socket = Stream::tls(mid))
220 }
221 },
222 Err(e) => Err(e),
223 }
224 } else {
225 Ok(self.socket = Stream::tcp(sock))
226 }
227 } else {
228 if self.settings.panic_on_new_connection {
229 panic!("Unable to connect to server.");
230 }
231 Err(Error::new(Kind::Internal, "Exhausted possible addresses."))
232 }
233 } else {
234 Err(Error::new(
235 Kind::Internal,
236 "Unable to reset client connection because it is active.",
237 ))
238 }
239 } else {
240 Err(Error::new(
241 Kind::Internal,
242 "Server connections cannot be reset.",
243 ))
244 }
245 }
246
247 #[cfg(not(feature = "ssl"))]
248 pub fn reset(&mut self) -> Result<()> {
249 if self.is_client() {
250 if let Connecting(ref mut req, ref mut res) = self.state {
251 req.set_position(0);
252 res.set_position(0);
253 self.events.remove(Ready::readable());
254 self.events.insert(Ready::writable());
255
256 if let Some(ref addr) = self.addresses.pop() {
257 let sock = try!(TcpStream::connect(addr));
258 Ok(self.socket = Stream::tcp(sock))
259 } else {
260 if self.settings.panic_on_new_connection {
261 panic!("Unable to connect to server.");
262 }
263 Err(Error::new(Kind::Internal, "Exhausted possible addresses."))
264 }
265 } else {
266 Err(Error::new(
267 Kind::Internal,
268 "Unable to reset client connection because it is active.",
269 ))
270 }
271 } else {
272 Err(Error::new(
273 Kind::Internal,
274 "Server connections cannot be reset.",
275 ))
276 }
277 }
278
279 pub fn events(&self) -> Ready {
280 self.events
281 }
282
283 pub fn is_client(&self) -> bool {
284 match self.endpoint {
285 Client(_) => true,
286 Server => false,
287 }
288 }
289
290 pub fn is_server(&self) -> bool {
291 match self.endpoint {
292 Client(_) => false,
293 Server => true,
294 }
295 }
296
297 pub fn shutdown(&mut self) {
298 self.handler.on_shutdown();
299 if let Err(err) = self.send_close(CloseCode::Away, "Shutting down.") {
300 self.handler.on_error(err);
301 self.disconnect()
302 }
303 }
304
305 #[inline]
306 pub fn new_timeout(&mut self, event: Token, timeout: Timeout) -> Result<()> {
307 self.handler.on_new_timeout(event, timeout)
308 }
309
310 #[inline]
311 pub fn timeout_triggered(&mut self, event: Token) -> Result<()> {
312 self.handler.on_timeout(event)
313 }
314
315 pub fn error(&mut self, err: Error) {
316 match self.state {
317 Connecting(_, ref mut res) => match err.kind {
318 #[cfg(feature = "ssl")]
319 Kind::Ssl(_) => {
320 self.handler.on_error(err);
321 self.events = Ready::empty();
322 }
323 Kind::Io(_) => {
324 self.handler.on_error(err);
325 self.events = Ready::empty();
326 }
327 Kind::Protocol => {
328 let msg = err.to_string();
329 self.handler.on_error(err);
330 if let Server = self.endpoint {
331 res.get_mut().clear();
332 if let Err(err) =
333 write!(res.get_mut(), "HTTP/1.1 400 Bad Request\r\n\r\n{}", msg)
334 {
335 self.handler.on_error(Error::from(err));
336 self.events = Ready::empty();
337 } else {
338 self.events.remove(Ready::readable());
339 self.events.insert(Ready::writable());
340 }
341 } else {
342 self.events = Ready::empty();
343 }
344 }
345 _ => {
346 let msg = err.to_string();
347 self.handler.on_error(err);
348 if let Server = self.endpoint {
349 res.get_mut().clear();
350 if let Err(err) = write!(
351 res.get_mut(),
352 "HTTP/1.1 500 Internal Server Error\r\n\r\n{}",
353 msg
354 ) {
355 self.handler.on_error(Error::from(err));
356 self.events = Ready::empty();
357 } else {
358 self.events.remove(Ready::readable());
359 self.events.insert(Ready::writable());
360 }
361 } else {
362 self.events = Ready::empty();
363 }
364 }
365 },
366 _ => {
367 match err.kind {
368 Kind::Internal => {
369 if self.settings.panic_on_internal {
370 panic!("Panicking on internal error -- {}", err);
371 }
372 let reason = format!("{}", err);
373
374 self.handler.on_error(err);
375 if let Err(err) = self.send_close(CloseCode::Error, reason) {
376 self.handler.on_error(err);
377 self.disconnect()
378 }
379 }
380 Kind::Capacity => {
381 if self.settings.panic_on_capacity {
382 panic!("Panicking on capacity error -- {}", err);
383 }
384 let reason = format!("{}", err);
385
386 self.handler.on_error(err);
387 if let Err(err) = self.send_close(CloseCode::Size, reason) {
388 self.handler.on_error(err);
389 self.disconnect()
390 }
391 }
392 Kind::Protocol => {
393 if self.settings.panic_on_protocol {
394 panic!("Panicking on protocol error -- {}", err);
395 }
396 let reason = format!("{}", err);
397
398 self.handler.on_error(err);
399 if let Err(err) = self.send_close(CloseCode::Protocol, reason) {
400 self.handler.on_error(err);
401 self.disconnect()
402 }
403 }
404 Kind::Encoding(_) => {
405 if self.settings.panic_on_encoding {
406 panic!("Panicking on encoding error -- {}", err);
407 }
408 let reason = format!("{}", err);
409
410 self.handler.on_error(err);
411 if let Err(err) = self.send_close(CloseCode::Invalid, reason) {
412 self.handler.on_error(err);
413 self.disconnect()
414 }
415 }
416 Kind::Http(_) => {
417 self.handler.on_error(err);
419 error!("Disconnecting WebSocket.");
420 self.disconnect()
421 }
422 Kind::Custom(_) => {
423 self.handler.on_error(err);
424 }
425 Kind::Timer(_) => {
426 if self.settings.panic_on_timeout {
427 panic!("Panicking on timer failure -- {}", err);
428 }
429 self.handler.on_error(err);
430 }
431 Kind::Queue(_) => {
432 if self.settings.panic_on_queue {
433 panic!("Panicking on queue error -- {}", err);
434 }
435 self.handler.on_error(err);
436 }
437 _ => {
438 if self.settings.panic_on_io {
439 panic!("Panicking on io error -- {}", err);
440 }
441 self.handler.on_error(err);
442 self.disconnect()
443 }
444 }
445 }
446 }
447 }
448
449 pub fn disconnect(&mut self) {
450 match self.state {
451 RespondingClose | FinishedClose | Connecting(_, _) => (),
452 _ => {
453 self.handler.on_close(CloseCode::Abnormal, "");
454 }
455 }
456 self.events = Ready::empty()
457 }
458
459 pub fn consume(self) -> H {
460 self.handler
461 }
462
463 fn write_handshake(&mut self) -> Result<()> {
464 if let Connecting(ref mut req, ref mut res) = self.state {
465 match self.endpoint {
466 Server => {
467 let mut done = false;
468 if let Some(_) = try!(self.socket.try_write_buf(res)) {
469 if res.position() as usize == res.get_ref().len() {
470 done = true
471 }
472 }
473 if !done {
474 return Ok(());
475 }
476 }
477 Client(_) => {
478 if let Some(_) = try!(self.socket.try_write_buf(req)) {
479 if req.position() as usize == req.get_ref().len() {
480 trace!(
481 "Finished writing handshake request to {}",
482 self.socket
483 .peer_addr()
484 .map(|addr| addr.to_string())
485 .unwrap_or("UNKNOWN".into())
486 );
487 self.events.insert(Ready::readable());
488 self.events.remove(Ready::writable());
489 }
490 }
491 return Ok(());
492 }
493 }
494 }
495
496 if let Connecting(ref req, ref res) = replace(&mut self.state, Open) {
497 trace!(
498 "Finished writing handshake response to {}",
499 self.peer_addr()
500 );
501
502 let request = match Request::parse(req.get_ref()) {
503 Ok(Some(req)) => req,
504 _ => {
505 self.state = FinishedClose;
508 self.events = Ready::empty();
509 return Ok(());
510 }
511 };
512
513 let response = try!(try!(Response::parse(res.get_ref())).ok_or(Error::new(
514 Kind::Internal,
515 "Failed to parse response after handshake is complete."
516 )));
517
518 if response.status() != 101 {
519 self.events = Ready::empty();
520 return Ok(());
521 } else {
522 try!(self.handler.on_open(Handshake {
523 request: request,
524 response: response,
525 peer_addr: self.socket.peer_addr().ok(),
526 local_addr: self.socket.local_addr().ok(),
527 }));
528 debug!("Connection to {} is now open.", self.peer_addr());
529 self.events.insert(Ready::readable());
530 return Ok(self.check_events());
531 }
532 } else {
533 Err(Error::new(
534 Kind::Internal,
535 "Tried to write WebSocket handshake while not in connecting state!",
536 ))
537 }
538 }
539
540 fn read_handshake(&mut self) -> Result<()> {
541 if let Connecting(ref mut req, ref mut res) = self.state {
542 match self.endpoint {
543 Server => {
544 if let Some(_) = try!(self.socket.try_read_buf(req.get_mut())) {
545 if let Some(ref request) = try!(Request::parse(req.get_ref())) {
546 trace!("Handshake request received: \n{}", request);
547 let response = try!(self.handler.on_request(request));
548 try!(response.format(res.get_mut()));
549 self.events.remove(Ready::readable());
550 self.events.insert(Ready::writable());
551 }
552 }
553 return Ok(());
554 }
555 Client(_) => {
556 if let Some(_) = try!(self.socket.try_read_buf(res.get_mut())) {
557 let end = {
559 let data = res.get_ref();
560 let end = data
561 .iter()
562 .enumerate()
563 .take_while(|&(ind, _)| !data[..ind].ends_with(b"\r\n\r\n"))
564 .count();
565 if !data[..end].ends_with(b"\r\n\r\n") {
566 return Ok(());
567 }
568 self.in_buffer.get_mut().extend(&data[end..]);
569 end
570 };
571 res.get_mut().truncate(end);
572 }
573 }
574 }
575 }
576
577 if let Connecting(ref req, ref res) = replace(&mut self.state, Open) {
578 trace!(
579 "Finished reading handshake response from {}",
580 self.peer_addr()
581 );
582
583 let request = try!(try!(Request::parse(req.get_ref())).ok_or(Error::new(
584 Kind::Internal,
585 "Failed to parse request after handshake is complete."
586 )));
587
588 let response = try!(try!(Response::parse(res.get_ref())).ok_or(Error::new(
589 Kind::Internal,
590 "Failed to parse response after handshake is complete."
591 )));
592
593 trace!("Handshake response received: \n{}", response);
594
595 if response.status() != 101 {
596 if response.status() != 301 && response.status() != 302 {
597 return Err(Error::new(Kind::Protocol, "Handshake failed."));
598 } else {
599 return Ok(());
600 }
601 }
602
603 if self.settings.key_strict {
604 let req_key = try!(request.hashed_key());
605 let res_key = try!(from_utf8(try!(response.key())));
606 if req_key != res_key {
607 return Err(Error::new(
608 Kind::Protocol,
609 format!(
610 "Received incorrect WebSocket Accept key: {} vs {}",
611 req_key, res_key
612 ),
613 ));
614 }
615 }
616
617 try!(self.handler.on_response(&response));
618 try!(self.handler.on_open(Handshake {
619 request: request,
620 response: response,
621 peer_addr: self.socket.peer_addr().ok(),
622 local_addr: self.socket.local_addr().ok(),
623 }));
624
625 if !self.in_buffer.get_ref().is_empty() {
627 try!(self.read_frames());
628 }
629
630 return Ok(self.check_events());
631 }
632 Err(Error::new(
633 Kind::Internal,
634 "Tried to read WebSocket handshake while not in connecting state!",
635 ))
636 }
637
638 pub fn read(&mut self) -> Result<()> {
639 if self.socket.is_negotiating() {
640 trace!("Performing TLS negotiation on {}.", self.peer_addr());
641 try!(self.socket.clear_negotiating());
642 self.write()
643 } else {
644 let res = if self.state.is_connecting() {
645 trace!("Ready to read handshake from {}.", self.peer_addr());
646 self.read_handshake()
647 } else {
648 trace!("Ready to read messages from {}.", self.peer_addr());
649 while let Some(len) = try!(self.buffer_in()) {
650 try!(self.read_frames());
651 if len == 0 {
652 if self.events.is_writable() {
653 self.events.remove(Ready::readable());
654 } else {
655 self.disconnect()
656 }
657 break;
658 }
659 }
660 Ok(())
661 };
662
663 if self.socket.is_negotiating() && res.is_ok() {
664 self.events.remove(Ready::readable());
665 self.events.insert(Ready::writable());
666 }
667 res
668 }
669 }
670
671 fn read_frames(&mut self) -> Result<()> {
672 while let Some(mut frame) = try!(Frame::parse(&mut self.in_buffer)) {
673 match self.state {
674 RespondingClose | FinishedClose => continue,
676 _ => (),
677 }
678
679 if self.settings.masking_strict {
680 if frame.is_masked() {
681 if self.is_client() {
682 return Err(Error::new(
683 Kind::Protocol,
684 "Received masked frame from a server endpoint.",
685 ));
686 }
687 } else {
688 if self.is_server() {
689 return Err(Error::new(
690 Kind::Protocol,
691 "Received unmasked frame from a client endpoint.",
692 ));
693 }
694 }
695 }
696
697 frame.remove_mask();
699
700 if let Some(frame) = try!(self.handler.on_frame(frame)) {
701 if frame.is_final() {
702 match frame.opcode() {
703 OpCode::Text => {
705 trace!("Received text frame {:?}", frame);
706 if !self.fragments.is_empty() {
709 return Err(Error::new(Kind::Protocol, "Received unfragmented text frame while processing fragmented message."));
710 }
711 let msg = Message::text(try!(String::from_utf8(frame.into_data())
712 .map_err(|err| err.utf8_error())));
713 try!(self.handler.on_message(msg));
714 }
715 OpCode::Binary => {
716 trace!("Received binary frame {:?}", frame);
717 if !self.fragments.is_empty() {
720 return Err(Error::new(Kind::Protocol, "Received unfragmented binary frame while processing fragmented message."));
721 }
722 let data = frame.into_data();
723 try!(self.handler.on_message(Message::binary(data)));
724 }
725 OpCode::Close => {
727 trace!("Received close frame {:?}", frame);
728 if self.state.is_closing() {
730 if self.is_server() {
731 self.events = Ready::empty()
733 } else {
734 }
737 } else {
738 self.state = RespondingClose;
740 }
741
742 let mut close_code = [0u8; 2];
743 let mut data = Cursor::new(frame.into_data());
744 if let 2 = try!(data.read(&mut close_code)) {
745 let raw_code: u16 =
746 (close_code[0] as u16) << 8 | (close_code[1] as u16);
747 trace!(
748 "Connection to {} received raw close code: {:?}, {:?}",
749 self.peer_addr(),
750 raw_code,
751 close_code
752 );
753 let named = CloseCode::from(raw_code);
754 if let CloseCode::Other(code) = named {
755 if code < 1000 ||
756 code >= 5000 ||
757 code == 1004 ||
758 code == 1014 ||
759 code == 1016 || code == 1100 || code == 2000 ||
762 code == 2999
763 {
764 return Err(Error::new(
765 Kind::Protocol,
766 format!(
767 "Received invalid close code from endpoint: {}",
768 code
769 ),
770 ));
771 }
772 }
773 let has_reason = {
774 if let Ok(reason) = from_utf8(&data.get_ref()[2..]) {
775 self.handler.on_close(named, reason); true
777 } else {
778 self.handler.on_close(named, "");
779 false
780 }
781 };
782
783 if let CloseCode::Abnormal = named {
784 return Err(Error::new(
785 Kind::Protocol,
786 "Received abnormal close code from endpoint.",
787 ));
788 } else if let CloseCode::Status = named {
789 return Err(Error::new(
790 Kind::Protocol,
791 "Received no status close code from endpoint.",
792 ));
793 } else if let CloseCode::Restart = named {
794 return Err(Error::new(
795 Kind::Protocol,
796 "Restart close code is not supported.",
797 ));
798 } else if let CloseCode::Again = named {
799 return Err(Error::new(
800 Kind::Protocol,
801 "Try again later close code is not supported.",
802 ));
803 } else if let CloseCode::Tls = named {
804 return Err(Error::new(
805 Kind::Protocol,
806 "Received TLS close code outside of TLS handshake.",
807 ));
808 } else {
809 if !self.state.is_closing() {
810 if has_reason {
811 try!(self.send_close(named, "")); } else {
813 try!(self.send_close(CloseCode::Invalid, ""));
814 }
815 } else {
816 self.state = FinishedClose;
817 }
818 }
819 } else {
820 self.handler.on_close(CloseCode::Status, "");
825 if !self.state.is_closing() {
826 try!(self.send_close(CloseCode::Empty, ""));
827 } else {
828 self.state = FinishedClose;
829 }
830 }
831 }
832 OpCode::Ping => {
833 trace!("Received ping frame {:?}", frame);
834 try!(self.send_pong(frame.into_data()));
835 }
836 OpCode::Pong => {
837 trace!("Received pong frame {:?}", frame);
838 }
840 OpCode::Continue => {
842 trace!("Received final fragment {:?}", frame);
843 if let Some(first) = self.fragments.pop_front() {
844 let size = self.fragments.iter().fold(
845 first.payload().len() + frame.payload().len(),
846 |len, frame| len + frame.payload().len(),
847 );
848 match first.opcode() {
849 OpCode::Text => {
850 trace!("Constructing text message from fragments: {:?} -> {:?} -> {:?}", first, self.fragments.iter().collect::<Vec<&Frame>>(), frame);
851 let mut data = Vec::with_capacity(size);
852 data.extend(first.into_data());
853 while let Some(frame) = self.fragments.pop_front() {
854 data.extend(frame.into_data());
855 }
856 data.extend(frame.into_data());
857
858 let string =
859 try!(String::from_utf8(data)
860 .map_err(|err| err.utf8_error()));
861
862 trace!(
863 "Calling handler with constructed message: {:?}",
864 string
865 );
866 try!(self.handler.on_message(Message::text(string)));
867 }
868 OpCode::Binary => {
869 trace!("Constructing binary message from fragments: {:?} -> {:?} -> {:?}", first, self.fragments.iter().collect::<Vec<&Frame>>(), frame);
870 let mut data = Vec::with_capacity(size);
871 data.extend(first.into_data());
872
873 while let Some(frame) = self.fragments.pop_front() {
874 data.extend(frame.into_data());
875 }
876
877 data.extend(frame.into_data());
878
879 trace!(
880 "Calling handler with constructed message: {:?}",
881 data
882 );
883 try!(self.handler.on_message(Message::binary(data)));
884 }
885 _ => {
886 return Err(Error::new(
887 Kind::Protocol,
888 "Encounted fragmented control frame.",
889 ))
890 }
891 }
892 } else {
893 return Err(Error::new(
894 Kind::Protocol,
895 "Unable to reconstruct fragmented message. No first frame.",
896 ));
897 }
898 }
899 _ => return Err(Error::new(Kind::Protocol, "Encountered invalid opcode.")),
900 }
901 } else {
902 if frame.is_control() {
903 return Err(Error::new(
904 Kind::Protocol,
905 "Encounted fragmented control frame.",
906 ));
907 } else {
908 trace!("Received non-final fragment frame {:?}", frame);
909 if !self.settings.fragments_grow
910 && self.settings.fragments_capacity == self.fragments.len()
911 {
912 return Err(Error::new(Kind::Capacity, "Exceeded max fragments."));
913 } else {
914 self.fragments.push_back(frame)
915 }
916 }
917 }
918 }
919 }
920 Ok(())
921 }
922
923 pub fn write(&mut self) -> Result<()> {
924 if self.socket.is_negotiating() {
925 trace!("Performing TLS negotiation on {}.", self.peer_addr());
926 try!(self.socket.clear_negotiating());
927 self.read()
928 } else {
929 let res = if self.state.is_connecting() {
930 trace!("Ready to write handshake to {}.", self.peer_addr());
931 self.write_handshake()
932 } else {
933 trace!("Ready to write messages to {}.", self.peer_addr());
934
935 self.events.remove(Ready::writable());
937
938 if let Some(len) = try!(self.socket.try_write_buf(&mut self.out_buffer)) {
939 trace!("Wrote {} bytes to {}", len, self.peer_addr());
940 let finished = len == 0
941 || self.out_buffer.position() == self.out_buffer.get_ref().len() as u64;
942 if finished {
943 match self.state {
944 FinishedClose if self.is_server() => {
947 return Ok(self.events = Ready::empty())
948 }
949 _ => (),
950 }
951 }
952 }
953
954 Ok(self.check_events())
956 };
957
958 if self.socket.is_negotiating() && res.is_ok() {
959 self.events.remove(Ready::writable());
960 self.events.insert(Ready::readable());
961 }
962 res
963 }
964 }
965
966 pub fn send_message(&mut self, msg: Message) -> Result<()> {
967 if self.state.is_closing() {
968 trace!(
969 "Connection is closing. Ignoring request to send message {:?} to {}.",
970 msg,
971 self.peer_addr()
972 );
973 return Ok(());
974 }
975
976 let opcode = msg.opcode();
977 trace!("Message opcode {:?}", opcode);
978 let data = msg.into_data();
979
980 if let Some(frame) = try!(self
981 .handler
982 .on_send_frame(Frame::message(data, opcode, true)))
983 {
984 if frame.payload().len() > self.settings.fragment_size {
985 trace!("Chunking at {:?}.", self.settings.fragment_size);
986 let mut chunks = frame
988 .payload()
989 .chunks(self.settings.fragment_size)
990 .peekable();
991 let chunk = chunks.next().expect("Unable to get initial chunk!");
992
993 let mut first = Frame::message(Vec::from(chunk), opcode, false);
994
995 first.set_rsv1(frame.has_rsv1());
997 first.set_rsv2(frame.has_rsv2());
998 first.set_rsv3(frame.has_rsv3());
999
1000 try!(self.buffer_frame(first));
1001
1002 while let Some(chunk) = chunks.next() {
1003 if let Some(_) = chunks.peek() {
1004 try!(self.buffer_frame(Frame::message(
1005 Vec::from(chunk),
1006 OpCode::Continue,
1007 false
1008 )));
1009 } else {
1010 try!(self.buffer_frame(Frame::message(
1011 Vec::from(chunk),
1012 OpCode::Continue,
1013 true
1014 )));
1015 }
1016 }
1017 } else {
1018 trace!("Sending unfragmented message frame.");
1019 try!(self.buffer_frame(frame));
1021 }
1022 }
1023 Ok(self.check_events())
1024 }
1025
1026 #[inline]
1027 pub fn send_ping(&mut self, data: Vec<u8>) -> Result<()> {
1028 if self.state.is_closing() {
1029 trace!(
1030 "Connection is closing. Ignoring request to send ping {:?} to {}.",
1031 data,
1032 self.peer_addr()
1033 );
1034 return Ok(());
1035 }
1036 trace!("Sending ping to {}.", self.peer_addr());
1037
1038 if let Some(frame) = try!(self.handler.on_send_frame(Frame::ping(data))) {
1039 try!(self.buffer_frame(frame));
1040 }
1041 Ok(self.check_events())
1042 }
1043
1044 #[inline]
1045 pub fn send_pong(&mut self, data: Vec<u8>) -> Result<()> {
1046 if self.state.is_closing() {
1047 trace!(
1048 "Connection is closing. Ignoring request to send pong {:?} to {}.",
1049 data,
1050 self.peer_addr()
1051 );
1052 return Ok(());
1053 }
1054 trace!("Sending pong to {}.", self.peer_addr());
1055
1056 if let Some(frame) = try!(self.handler.on_send_frame(Frame::pong(data))) {
1057 try!(self.buffer_frame(frame));
1058 }
1059 Ok(self.check_events())
1060 }
1061
1062 #[inline]
1063 pub fn send_close<R>(&mut self, code: CloseCode, reason: R) -> Result<()>
1064 where
1065 R: Borrow<str>,
1066 {
1067 match self.state {
1068 RespondingClose => self.state = FinishedClose,
1071 AwaitingClose | FinishedClose => {
1073 trace!(
1074 "Connection is already closing. Ignoring close {:?} -- {:?} to {}.",
1075 code,
1076 reason.borrow(),
1077 self.peer_addr()
1078 );
1079 return Ok(self.check_events());
1080 }
1081 Open => self.state = AwaitingClose,
1083 Connecting(_, _) => {
1084 debug_assert!(false, "Attempted to close connection while not yet open.")
1085 }
1086 }
1087
1088 trace!(
1089 "Sending close {:?} -- {:?} to {}.",
1090 code,
1091 reason.borrow(),
1092 self.peer_addr()
1093 );
1094
1095 if let Some(frame) = try!(self
1096 .handler
1097 .on_send_frame(Frame::close(code, reason.borrow())))
1098 {
1099 try!(self.buffer_frame(frame));
1100 }
1101
1102 trace!("Connection to {} is now closing.", self.peer_addr());
1103
1104 Ok(self.check_events())
1105 }
1106
1107 fn check_events(&mut self) {
1108 if !self.state.is_connecting() {
1109 self.events.insert(Ready::readable());
1110 if self.out_buffer.position() < self.out_buffer.get_ref().len() as u64 {
1111 self.events.insert(Ready::writable());
1112 }
1113 }
1114 }
1115
1116 fn buffer_frame(&mut self, mut frame: Frame) -> Result<()> {
1117 try!(self.check_buffer_out(&frame));
1118
1119 if self.is_client() {
1120 frame.set_mask();
1121 }
1122
1123 trace!("Buffering frame to {}:\n{}", self.peer_addr(), frame);
1124
1125 let pos = self.out_buffer.position();
1126 try!(self.out_buffer.seek(SeekFrom::End(0)));
1127 try!(frame.format(&mut self.out_buffer));
1128 try!(self.out_buffer.seek(SeekFrom::Start(pos)));
1129 Ok(())
1130 }
1131
1132 fn check_buffer_out(&mut self, frame: &Frame) -> Result<()> {
1133 if self.out_buffer.get_ref().capacity() <= self.out_buffer.get_ref().len() + frame.len() {
1134 let mut new = Vec::with_capacity(self.out_buffer.get_ref().capacity());
1136 new.extend(&self.out_buffer.get_ref()[self.out_buffer.position() as usize..]);
1137 if new.len() == new.capacity() {
1138 if self.settings.out_buffer_grow {
1139 new.reserve(self.settings.out_buffer_capacity)
1140 } else {
1141 return Err(Error::new(
1142 Kind::Capacity,
1143 "Maxed out output buffer for connection.",
1144 ));
1145 }
1146 }
1147 self.out_buffer = Cursor::new(new);
1148 }
1149 Ok(())
1150 }
1151
1152 fn buffer_in(&mut self) -> Result<Option<usize>> {
1153 trace!("Reading buffer for connection to {}.", self.peer_addr());
1154 if let Some(len) = try!(self.socket.try_read_buf(self.in_buffer.get_mut())) {
1155 trace!("Buffered {}.", len);
1156 if self.in_buffer.get_ref().len() == self.in_buffer.get_ref().capacity() {
1157 let mut new = Vec::with_capacity(self.in_buffer.get_ref().capacity());
1159 new.extend(&self.in_buffer.get_ref()[self.in_buffer.position() as usize..]);
1160 if new.len() == new.capacity() {
1161 if self.settings.in_buffer_grow {
1162 if new.capacity() >= self.settings.max_in_buffer {
1163 return Err(Error::new(
1164 Kind::Capacity,
1165 "Maxed out input buffer for connection.",
1166 ));
1167 }
1168 new.reserve(self.settings.in_buffer_capacity);
1169 } else {
1170 return Err(Error::new(
1171 Kind::Capacity,
1172 "Maxed out input buffer for connection.",
1173 ));
1174 }
1175 }
1176 self.in_buffer = Cursor::new(new);
1177 }
1178 Ok(Some(len))
1179 } else {
1180 Ok(None)
1181 }
1182 }
1183}