1#[warn(dead_code)]
2
3use std::mem::replace;
4use std::mem::transmute;
5use std::borrow::Borrow;
6use std::io::{Write, Read, Cursor, Seek, SeekFrom};
7use std::net::SocketAddr;
8use std::net::TcpStream;
9use std::collections::VecDeque;
10use std::str::from_utf8;
11
12use time;
13use url;
14#[cfg(feature="ssl")]
15use openssl::ssl::SslStream;
16
17use message::Message;
18use handshake::{Handshake, Request, Response};
19use frame::Frame;
20use protocol::{CloseCode, OpCode};
21use result::{Result, Error, Kind};
22use handler::Handler;
23use stream::Stream;
24use Settings;
25
26use self::State::*;
27use self::Endpoint::*;
28
29#[derive(Debug)]
30pub enum State {
31 Connecting(Cursor<Vec<u8>>, Cursor<Vec<u8>>),
33 Open,
35 AwaitingClose,
36 RespondingClose,
37 FinishedClose,
38}
39
40#[derive(Debug, Eq, PartialEq, Clone, Copy)]
42pub enum Endpoint {
43 Client,
45 Server,
47}
48
49impl State {
50
51 #[inline]
52 pub fn is_connecting(&self) -> bool {
53 match *self {
54 State::Connecting(..) => true,
55 _ => false,
56 }
57 }
58
59 #[allow(dead_code)]
60 #[inline]
61 pub fn is_open(&self) -> bool {
62 match *self {
63 State::Open => true,
64 _ => false,
65 }
66 }
67
68 #[inline]
69 pub fn is_closing(&self) -> bool {
70 match *self {
71 State::AwaitingClose => true,
72 State::FinishedClose => true,
73 _ => false,
74 }
75 }
76}
77
78pub struct Connection<Handler>
79{
80 socket: Stream,
81 state: State,
82 endpoint: Endpoint,
83
84 fragments: VecDeque<Frame>,
85
86 in_buffer: Cursor<Vec<u8>>,
87 out_buffer: Cursor<Vec<u8>>,
88
89 cur_data: Cursor<Vec<u8>>,
90
91 addresses: Vec<SocketAddr>,
92
93 settings: Settings,
94 handler: Handler,
95
96 read_time: u64,
97
98 over: bool,
99}
100
101impl<H> Connection<H>
102 where H: Handler
103{
104 pub fn new(sock: TcpStream, handler: H, settings: Settings) -> Connection<H> {
105 Connection {
106 socket: Stream::tcp(sock),
107 state: Connecting(
108 Cursor::new(Vec::with_capacity(2048)),
109 Cursor::new(Vec::with_capacity(2048)),
110 ),
111 endpoint: Endpoint::Server,
112 fragments: VecDeque::with_capacity(settings.fragments_capacity),
113 in_buffer: Cursor::new(Vec::with_capacity(204800)),
114 out_buffer: Cursor::new(Vec::with_capacity(204800)),
115 cur_data: Cursor::new(Vec::with_capacity(204800)),
116 handler: handler,
117 addresses: Vec::new(),
118 settings: settings,
119 read_time: time::precise_time_ns() / 1000_000,
120 over: false,
121 }
122 }
123
124 pub fn as_server(&mut self) -> Result<()> {
125 Ok(())
126 }
127
128 pub fn as_client(&mut self, url: &url::Url, addrs: Vec<SocketAddr>) -> Result<()> {
129 if let Connecting(ref mut req, _) = self.state {
130 self.addresses = addrs;
131 self.endpoint = Endpoint::Client;
132 try!(self.handler.build_request(url)).format(req.get_mut())
133 } else {
134 Err(Error::new(
135 Kind::Internal,
136 "Tried to set connection to client while not connecting."))
137 }
138 }
139
140 #[cfg(feature="ssl")]
141 pub fn encrypt(&mut self) -> Result<()> {
142 let ssl_stream = match self.endpoint {
143 Server => try!(SslStream::accept(
144 try!(self.handler.build_ssl()),
145 try!(self.socket().try_clone()))),
146
147 Client => try!(SslStream::connect(
148 try!(self.handler.build_ssl()),
149 try!(self.socket().try_clone()))),
150 };
151
152 Ok(self.socket = Stream::tls(ssl_stream))
153 }
154
155 pub fn socket(&self) -> &TcpStream {
156 &self.socket.evented()
157 }
158
159 fn peer_addr(&self) -> String {
160 if let Ok(addr) = self.socket.peer_addr() {
161 addr.to_string()
162 } else {
163 "UNKNOWN".into()
164 }
165 }
166
167 #[cfg(feature="ssl")]
169 pub fn reset(&mut self) -> Result<()> {
170 if self.is_client() {
171 if let Connecting(ref mut req, ref mut res) = self.state {
172 req.set_position(0);
173 res.set_position(0);
174
175 if let Some(ref addr) = self.addresses.pop() {
176 let sock = try!(TcpStream::connect(addr));
177 if self.socket.is_tls() {
178 Ok(self.socket = Stream::tls(
179 try!(SslStream::connect(
180 try!(self.handler.build_ssl()),
181 sock))))
182
183 } else {
184 Ok(self.socket = Stream::tcp(sock))
185 }
186 } else {
187 if self.settings.panic_on_new_connection {
188 panic!("Unable to connect to server.");
189 }
190 Err(Error::new(Kind::Internal, "Exhausted possible addresses."))
191 }
192 } else {
193 Err(Error::new(Kind::Internal, "Unable to reset client connection because it is active."))
194 }
195 } else {
196 Err(Error::new(Kind::Internal, "Server connections cannot be reset."))
197 }
198 }
199
200 #[cfg(not(feature="ssl"))]
201 pub fn reset(&mut self) -> Result<()> {
202 if self.is_client() {
203 if let Connecting(ref mut req, ref mut res) = self.state {
204 req.set_position(0);
205 res.set_position(0);
206
207 if let Some(ref addr) = self.addresses.pop() {
208 let sock = try!(TcpStream::connect(addr));
209 Ok(self.socket = Stream::tcp(sock))
210 } else {
211 if self.settings.panic_on_new_connection {
212 panic!("Unable to connect to server.");
213 }
214 Err(Error::new(Kind::Internal, "Exhausted possible addresses."))
215 }
216 } else {
217 Err(Error::new(Kind::Internal, "Unable to reset client connection because it is active."))
218 }
219 } else {
220 Err(Error::new(Kind::Internal, "Server connections cannot be reset."))
221 }
222 }
223
224 pub fn set_read_time(&mut self, read_time: u64) {
225 self.read_time = read_time
226 }
227
228 pub fn get_read_time(&self) -> u64 {
229 self.read_time
230 }
231
232 pub fn is_over(&self) -> bool {
233 match self.state {
234 State::FinishedClose => return true,
235 _ => (),
236 }
237 self.over
238 }
239
240 pub fn is_closing(&self) -> bool {
241 self.state.is_closing()
242 }
243
244 pub fn is_client(&self) -> bool {
245 match self.endpoint {
246 Client => true,
247 Server => false,
248 }
249 }
250
251 pub fn is_server(&self) -> bool {
252 match self.endpoint {
253 Client => false,
254 Server => true,
255 }
256 }
257
258 pub fn shutdown(&mut self) {
259 self.handler.on_shutdown();
260 if let Err(err) = self.send_close(CloseCode::Away, "Shutting down.") {
261 self.handler.on_error(err);
262 self.disconnect()
263 }
264 }
265
266 pub fn error(&mut self, err: Error) {
267 match self.state {
268 Connecting(_, ref mut res) => {
269 match err.kind {
270 #[cfg(feature="ssl")]
271 Kind::Ssl(_) | Kind::Io(_) => {
272 self.handler.on_error(err);
273 self.over = true
274 }
275 Kind::Protocol => {
276 let msg = err.to_string();
277 self.handler.on_error(err);
278 if let Server = self.endpoint {
279 res.get_mut().clear();
280
281 if let Err(err) = write!(
282 res.get_mut(),
283 "HTTP/1.1 400 Bad Request\r\n\r\n{}", msg)
284 {
285 self.handler.on_error(Error::from(err));
286 }
287 }
288 self.over = true
289 }
290 _ => {
291 let msg = err.to_string();
292 self.handler.on_error(err);
293 if let Server = self.endpoint {
294 res.get_mut().clear();
295 if let Err(err) = write!(
296 res.get_mut(),
297 "HTTP/1.1 500 Internal Server Error\r\n\r\n{}", msg) {
298 self.handler.on_error(Error::from(err));
299 }
300 }
301 self.over = true
302 }
303 }
304
305 }
306 _ => {
307 match err.kind {
308 Kind::Internal => {
309 if self.settings.panic_on_internal {
310 panic!("Panicking on internal error -- {}", err);
311 }
312 let reason = format!("{}", err);
313
314 self.handler.on_error(err);
315 if let Err(err) = self.send_close(CloseCode::Error, reason) {
316 self.handler.on_error(err);
317 self.disconnect()
318 }
319 }
320 Kind::Capacity => {
321 if self.settings.panic_on_capacity {
322 panic!("Panicking on capacity error -- {}", err);
323 }
324 let reason = format!("{}", err);
325
326 self.handler.on_error(err);
327 if let Err(err) = self.send_close(CloseCode::Size, reason) {
328 self.handler.on_error(err);
329 self.disconnect()
330 }
331 }
332 Kind::Protocol => {
333 if self.settings.panic_on_protocol {
334 panic!("Panicking on protocol error -- {}", err);
335 }
336 let reason = format!("{}", err);
337
338 self.handler.on_error(err);
339 if let Err(err) = self.send_close(CloseCode::Protocol, reason) {
340 self.handler.on_error(err);
341 self.disconnect()
342 }
343 }
344 Kind::Encoding(_) => {
345 if self.settings.panic_on_encoding {
346 panic!("Panicking on encoding error -- {}", err);
347 }
348 let reason = format!("{}", err);
349
350 self.handler.on_error(err);
351 if let Err(err) = self.send_close(CloseCode::Invalid, reason) {
352 self.handler.on_error(err);
353 self.disconnect()
354 }
355 }
356 Kind::Http(_) => {
357 self.handler.on_error(err);
359 error!("Disconnecting WebSocket.");
360 self.disconnect()
361 }
362 Kind::Custom(_) => {
363 self.handler.on_error(err);
364 }
365 _ => {
366 if self.settings.panic_on_io {
367 panic!("Panicking on io error -- {}", err);
368 }
369 self.handler.on_error(err);
370 self.disconnect()
371 }
372 }
373 }
374 }
375 }
376
377 pub fn disconnect(&mut self) {
378 match self.state {
379 AwaitingClose | RespondingClose | FinishedClose | Connecting(_, _)=> (),
380 _ => {
381 self.handler.on_close(CloseCode::Abnormal, "");
382 }
383 }
384 self.over = true;
385 }
386
387 pub fn consume(self) -> H {
388 self.handler
389 }
390
391 fn write_handshake(&mut self) -> Result<()> {
392 if let Connecting(ref mut req, ref mut res) = self.state {
393 match self.endpoint {
394 Server => {
395 let _ = try!(self.socket.do_write_buf(res));
396 }
397 Client => {
398 let _ = try!(self.socket.do_write_buf(req));
399 trace!("Finished writing handshake request to {}",
400 self.socket
401 .peer_addr()
402 .map(|addr| addr.to_string())
403 .unwrap_or("UNKNOWN".into()));
404 return Ok(())
405 }
406 }
407 }
408
409 if let Connecting(ref req, ref res) = replace(&mut self.state, Open) {
410 trace!("Finished writing handshake response to {}", self.peer_addr());
411
412 let request = match Request::parse(req.get_ref()) {
413 Ok(Some(req)) => req,
414 _ => {
415 self.state = FinishedClose;
418 self.over = true;
419 return Ok(())
420 }
421 };
422
423 let response = try!(try!(Response::parse(res.get_ref())).ok_or(
424 Error::new(Kind::Internal, "Failed to parse response after handshake is complete.")));
425
426 if response.status() != 101 {
427 self.over = true;
428 return Ok(())
429 } else {
430 try!(self.handler.on_open(Handshake {
431 request: request,
432 response: response,
433 peer_addr: self.socket.peer_addr().ok(),
434 local_addr: self.socket.local_addr().ok(),
435 }));
436 debug!("Connection to {} is now open.", self.peer_addr());
437 return self.check_events()
438 }
439 } else {
440 Err(Error::new(Kind::Internal, "Tried to write WebSocket handshake while not in connecting state!"))
441 }
442 }
443
444 pub fn new_data_received(&mut self, data: &[u8]) {
445 let _ = self.cur_data.write(data);
446 }
447
448 fn do_read_buf(cur_data: &mut Cursor<Vec<u8>>, val: &mut Cursor<Vec<u8>>) -> Result<usize> {
449 let position = val.position();
450 let len = {
451 let data = cur_data.get_mut();
452 let len = data.len();
453 let _ = try!(val.write(&data[..]));
454 data.clear();
455 len
456 };
457 cur_data.set_position(0);
458 val.set_position(position);
459 Ok(len)
460 }
461
462 fn read_handshake(&mut self) -> Result<()> {
463 let mut is_need_write = false;
464 let mut is_need_return = false;
465 if let Connecting(ref mut req, ref mut res) = self.state {
466 match self.endpoint {
467 Server => {
468 let len = try!(Self::do_read_buf(&mut self.cur_data, req));
469 if len == 0 {
470 return Err(Error::new(Kind::CloseSingal, "Connect Read Empty Size"));
471 }
472 if let Some(ref request) = try!(Request::parse(req.get_ref())) {
473 trace!("Handshake request received: \n{}", request);
474 let response = try!(self.handler.on_request(request));
475 try!(response.format(res.get_mut()));
476 is_need_write = true;
477 }
478 is_need_return = true;
479 }
480 Client => {
481 let len = try!(Self::do_read_buf(&mut self.cur_data, res));
482 if len == 0 {
483 return Err(Error::new(Kind::CloseSingal, "Connect Read Empty Size"));
484 }
485 let end = {
487 let data = res.get_ref();
488 let end = data.iter()
489 .enumerate()
490 .take_while(|&(ind, _)| !data[..ind].ends_with(b"\r\n\r\n"))
491 .count();
492 if !data[..end].ends_with(b"\r\n\r\n") {
493 return Ok(())
494 }
495 self.in_buffer.get_mut().extend(&data[end..]);
496 end
497 };
498 res.get_mut().truncate(end);
499 }
500 }
501 }
502
503 if is_need_write {
504 try!(self.write());
505 }
506
507 if is_need_return {
508 return Ok(());
509 }
510
511 if let Connecting(ref req, ref res) = replace(&mut self.state, Open) {
512 trace!("Finished reading handshake response from {}", self.peer_addr());
513
514 let request = try!(try!(Request::parse(req.get_ref())).ok_or(
515 Error::new(Kind::Internal, "Failed to parse request after handshake is complete.")));
516
517 let response = try!(try!(Response::parse(res.get_ref())).ok_or(
518 Error::new(Kind::Internal, "Failed to parse response after handshake is complete.")));
519
520 trace!("Handshake response received: \n{}", response);
521
522 if response.status() != 101 {
523 if response.status() != 301 && response.status() != 302 {
524 return Err(Error::new(Kind::Protocol, "Handshake failed."));
525 } else {
526 return Ok(())
527 }
528 }
529
530 if self.settings.key_strict {
531 let req_key = try!(request.hashed_key());
532 let res_key = try!(from_utf8(try!(response.key())));
533 if req_key != res_key {
534 return Err(Error::new(Kind::Protocol, format!("Received incorrect WebSocket Accept key: {} vs {}", req_key, res_key)));
535 }
536 }
537
538 try!(self.handler.on_response(&response));
539 try!(self.handler.on_open(Handshake {
540 request: request,
541 response: response,
542 peer_addr: self.socket.peer_addr().ok(),
543 local_addr: self.socket.local_addr().ok(),
544 }));
545
546 if !self.in_buffer.get_ref().is_empty() {
548 try!(self.read_frames());
549 }
550
551 return self.check_events()
552 }
553 Err(Error::new(Kind::Internal, "Tried to read WebSocket handshake while not in connecting state!"))
554 }
555
556 pub fn read(&mut self) -> Result<()> {
557 if self.socket.is_negotiating() {
558 trace!("Performing TLS negotiation on {}.", self.peer_addr());
559 try!(self.socket.clear_negotiating());
560 self.write()
561 } else {
562 let res = if self.state.is_connecting() {
563 trace!("Ready to read handshake from {}.", self.peer_addr());
564 self.read_handshake()
565 } else {
566 trace!("Ready to read messages from {}.", self.peer_addr());
567 if let Some(_) = try!(self.buffer_in()) {
568 if let Err(err) = self.read_frames() {
570 if let Kind::Io(_) = err.kind {
574 return Err(err)
575 }
576 self.error(err)
577 }
578 }
579 Ok(())
580 };
581
582 if self.socket.is_negotiating() && res.is_ok() {
583 }
584 res
585 }
586 }
587
588 fn read_frames(&mut self) -> Result<()> {
589 while let Some(mut frame) = try!(Frame::parse(&mut self.in_buffer)) {
590 match self.state {
591 RespondingClose | FinishedClose => continue,
593 _ => (),
594 }
595
596 if self.settings.masking_strict {
597 if frame.is_masked() {
598 if self.is_client() {
599 return Err(Error::new(Kind::Protocol, "Received masked frame from a server endpoint."))
600 }
601 } else {
602 if self.is_server() {
603 return Err(Error::new(Kind::Protocol, "Received unmasked frame from a client endpoint."))
604 }
605 }
606 }
607
608 frame.remove_mask();
610
611 if let Some(frame) = try!(self.handler.on_frame(frame)) {
612 if frame.is_final() {
613 match frame.opcode() {
614 OpCode::Text => {
616 if !self.fragments.is_empty() {
620 return Err(Error::new(Kind::Protocol, "Received unfragmented text frame while processing fragmented message."))
621 }
622 let msg = Message::text(try!(String::from_utf8(frame.into_data()).map_err(|err| err.utf8_error())));
623 try!(self.handler.on_message(msg));
624 }
625 OpCode::Binary => {
626 if !self.fragments.is_empty() {
630 return Err(Error::new(Kind::Protocol, "Received unfragmented binary frame while processing fragmented message."))
631 }
632 let data = frame.into_data();
633 try!(self.handler.on_message(Message::binary(data)));
634 }
635 OpCode::Close => {
637 if self.state.is_closing() {
640
641 if self.is_server() {
642 self.over = true;
644 } else {
645 }
648
649 } else {
650
651 self.state = RespondingClose;
653
654 }
655
656 let mut close_code = [0u8; 2];
657 let mut data = Cursor::new(frame.into_data());
658 if let 2 = try!(data.read(&mut close_code)) {
659 let code_be: u16 = unsafe {transmute(close_code) };
660 trace!("Connection to {} received raw close code: {:?}, {:b}", self.peer_addr(), code_be, code_be);
661 let named = CloseCode::from(u16::from_be(code_be));
662 if let CloseCode::Other(code) = named {
663 if
664 code < 1000 ||
665 code >= 5000 ||
666 code == 1004 ||
667 code == 1014 ||
668 code == 1016 || code == 1100 || code == 2000 ||
671 code == 2999
672 {
673 return Err(Error::new(Kind::Protocol, format!("Received invalid close code from endpoint: {}", code)))
674 }
675 }
676 let has_reason = {
677 if let Ok(reason) = from_utf8(&data.get_ref()[2..]) {
678 self.handler.on_close(named, reason); true
680 } else {
681 self.handler.on_close(named, "");
682 false
683 }
684 };
685
686 if let CloseCode::Abnormal = named {
687 return Err(Error::new(Kind::Protocol, "Received abnormal close code from endpoint."))
688 } else if let CloseCode::Status = named {
689 return Err(Error::new(Kind::Protocol, "Received no status close code from endpoint."))
690 } else if let CloseCode::Restart = named {
691 return Err(Error::new(Kind::Protocol, "Restart close code is not supported."))
692 } else if let CloseCode::Again = named {
693 return Err(Error::new(Kind::Protocol, "Try again later close code is not supported."))
694 } else if let CloseCode::Tls = named {
695 return Err(Error::new(Kind::Protocol, "Received TLS close code outside of TLS handshake."))
696 } else {
697 if !self.state.is_closing() {
698 if has_reason {
699 try!(self.send_close(named, "")); } else {
701 try!(self.send_close(CloseCode::Invalid, ""));
702 }
703 }
704 }
705 } else {
706 self.handler.on_close(CloseCode::Status, "Unable to read close code. Sending empty close frame.");
709 if !self.state.is_closing() {
710 try!(self.send_close(CloseCode::Empty, ""));
711 }
712 }
713 }
714 OpCode::Ping => {
715 try!(self.send_pong(frame.into_data()));
717 }
718 OpCode::Pong => {
719 }
722 OpCode::Continue => {
724 if let Some(first) = self.fragments.pop_front() {
726 let size = self.fragments.iter().fold(first.payload().len() + frame.payload().len(), |len, frame| len + frame.payload().len());
727 match first.opcode() {
728 OpCode::Text => {
729 trace!("Constructing text message from fragments: {:?} -> {:?} -> {:?}", first, self.fragments.iter().collect::<Vec<&Frame>>(), frame);
730 let mut data = Vec::with_capacity(size);
731 data.extend(first.into_data());
732 while let Some(frame) = self.fragments.pop_front() {
733 data.extend(frame.into_data());
734 }
735 data.extend(frame.into_data());
736
737 let string = try!(String::from_utf8(data).map_err(|err| err.utf8_error()));
738
739 trace!("Calling handler with constructed message: {:?}", string);
740 try!(self.handler.on_message(Message::text(string)));
741 }
742 OpCode::Binary => {
743 trace!("Constructing text message from fragments: {:?} -> {:?} -> {:?}", first, self.fragments.iter().collect::<Vec<&Frame>>(), frame);
744 let mut data = Vec::with_capacity(size);
745 data.extend(first.into_data());
746
747 while let Some(frame) = self.fragments.pop_front() {
748 data.extend(frame.into_data());
749 }
750
751 data.extend(frame.into_data());
752
753 trace!("Calling handler with constructed message: {:?}", data);
754 try!(self.handler.on_message(Message::binary(data)));
755 }
756 _ => {
757 return Err(Error::new(Kind::Protocol, "Encounted fragmented control frame."))
758 }
759 }
760 } else {
761 return Err(Error::new(Kind::Protocol, "Unable to reconstruct fragmented message. No first frame."))
762 }
763 }
764 _ => return Err(Error::new(Kind::Protocol, "Encountered invalid opcode.")),
765 }
766 } else {
767 if frame.is_control() {
768 return Err(Error::new(Kind::Protocol, "Encounted fragmented control frame."))
769 } else {
770 if !self.settings.fragments_grow &&
772 self.settings.fragments_capacity == self.fragments.len()
773 {
774 return Err(Error::new(Kind::Capacity, "Exceeded max fragments."))
775 } else {
776 self.fragments.push_back(frame)
777 }
778 }
779 }
780 }
781 }
782 Ok(())
783 }
784
785 pub fn write(&mut self) -> Result<()> {
786 if self.socket.is_negotiating() {
787 trace!("Performing TLS negotiation on {}.", self.peer_addr());
788 try!(self.socket.clear_negotiating());
789 self.read()
790 } else {
791 let res = if self.state.is_connecting() {
792 trace!("Ready to write handshake to {}.", self.peer_addr());
793 self.write_handshake()
794 } else {
795 trace!("Ready to write messages to {}.", self.peer_addr());
796
797 let len = try!(self.socket.do_write_buf(&mut self.out_buffer));
800 trace!("Wrote {} bytes to {}", len, self.peer_addr());
801 let finished = len == 0 || self.out_buffer.position() == self.out_buffer.get_ref().len() as u64;
802 if finished {
803 match self.state {
804 FinishedClose if self.is_server() => return Ok(self.over = true),
807 _ => (),
808 }
809 }
810
811 self.out_buffer.get_mut().clear();
812 self.out_buffer.set_position(0);
813
814 self.check_events()
816 };
817
818 if self.socket.is_negotiating() && res.is_ok() {
819 }
820 res
821 }
822 }
823
824 pub fn send_message(&mut self, msg: Message) -> Result<()> {
825 if self.state.is_closing() {
826 trace!("Connection is closing. Ignoring request to send message {:?} to {}.",
827 msg,
828 self.peer_addr());
829 return Ok(())
830 }
831
832 let opcode = msg.opcode();
833 trace!("Message opcode {:?}", opcode);
834 let data = msg.into_data();
835
836 if let Some(frame) = try!(self.handler.on_send_frame(Frame::message(data, opcode, true))) {
837
838 if frame.payload().len() > self.settings.fragment_size {
839 trace!("Chunking at {:?}.", self.settings.fragment_size);
840 let mut chunks = frame.payload().chunks(self.settings.fragment_size).peekable();
842 let chunk = chunks.next().expect("Unable to get initial chunk!");
843
844 let mut first = Frame::message(Vec::from(chunk), opcode, false);
845
846 first.set_rsv1(frame.has_rsv1());
848 first.set_rsv2(frame.has_rsv2());
849 first.set_rsv3(frame.has_rsv3());
850
851 try!(self.buffer_frame(first));
852
853 while let Some(chunk) = chunks.next() {
854 if let Some(_) = chunks.peek() {
855 try!(self.buffer_frame(
856 Frame::message(Vec::from(chunk), OpCode::Continue, false)));
857 } else {
858 try!(self.buffer_frame(
859 Frame::message(Vec::from(chunk), OpCode::Continue, true)));
860 }
861 }
862
863 } else {
864 trace!("Sending unfragmented message frame.");
865 try!(self.buffer_frame(frame));
867 }
868
869 }
870 self.check_events()
871 }
872
873 #[inline]
874 pub fn send_ping(&mut self, data: Vec<u8>) -> Result<()> {
875 if self.state.is_closing() {
876 trace!("Connection is closing. Ignoring request to send ping {:?} to {}.",
877 data,
878 self.peer_addr());
879 return Ok(())
880 }
881 trace!("Sending ping to {}.", self.peer_addr());
882
883 if let Some(frame) = try!(self.handler.on_send_frame(Frame::ping(data))) {
884 try!(self.buffer_frame(frame));
885 }
886 self.check_events()
887 }
888
889 #[inline]
890 pub fn send_pong(&mut self, data: Vec<u8>) -> Result<()> {
891 if self.state.is_closing() {
892 trace!("Connection is closing. Ignoring request to send pong {:?} to {}.",
893 data,
894 self.peer_addr());
895 return Ok(())
896 }
897 trace!("Sending pong to {}.", self.peer_addr());
898
899 if let Some(frame) = try!(self.handler.on_send_frame(Frame::pong(data))) {
900 try!(self.buffer_frame(frame));
901 }
902 self.check_events()
903 }
904
905 #[inline]
906 pub fn send_close<R>(&mut self, code: CloseCode, reason: R) -> Result<()>
907 where R: Borrow<str>
908 {
909 match self.state {
910 RespondingClose => self.state = FinishedClose,
913 AwaitingClose | FinishedClose => {
915 trace!("Connection is already closing. Ignoring close {:?} -- {:?} to {}.",
916 code,
917 reason.borrow(),
918 self.peer_addr());
919 return self.check_events()
920 }
921 Open => self.state = AwaitingClose,
923 Connecting(_, _) => {
924 self.disconnect();
925 return self.check_events();
926 }
928 }
929
930 trace!("Sending close {:?} -- {:?} to {}.", code, reason.borrow(), self.peer_addr());
931
932 if let Some(frame) = try!(self.handler.on_send_frame(Frame::close(code, reason.borrow()))) {
933 try!(self.buffer_frame(frame));
934 }
935
936 trace!("Connection to {} is now closing.", self.peer_addr());
937
938 self.check_events()
939 }
940
941 fn check_events(&mut self) -> Result<()> {
942 if !self.state.is_connecting() {
943 if self.out_buffer.position() < self.out_buffer.get_ref().len() as u64 {
944 return self.write();
945 }
946 }
947 Ok(())
948 }
949
950 fn buffer_frame(&mut self, mut frame: Frame) -> Result<()> {
951 try!(self.check_buffer_out(&frame));
952
953 if self.is_client() {
954 frame.set_mask();
955 }
956
957 let pos = self.out_buffer.position();
960 try!(self.out_buffer.seek(SeekFrom::End(0)));
961 try!(frame.format(&mut self.out_buffer));
962 try!(self.out_buffer.seek(SeekFrom::Start(pos)));
963 try!(self.write());
964 Ok(())
965 }
966
967 fn check_buffer_out(&mut self, frame: &Frame) -> Result<()> {
968
969 if self.out_buffer.get_ref().capacity() <= self.out_buffer.get_ref().len() + frame.len() {
970 let mut new = Vec::with_capacity(self.out_buffer.get_ref().capacity());
972 new.extend(&self.out_buffer.get_ref()[self.out_buffer.position() as usize ..]);
973 if new.len() == new.capacity() {
974 if self.settings.out_buffer_grow {
975 new.reserve(self.settings.out_buffer_capacity)
976 } else {
977 return Err(Error::new(Kind::Capacity, "Maxed out output buffer for connection."))
978 }
979 }
980 self.out_buffer = Cursor::new(new);
981 }
982 Ok(())
983 }
984
985 fn buffer_in(&mut self) -> Result<Option<usize>> {
986 let len = try!(Self::do_read_buf(&mut self.cur_data, &mut self.in_buffer));
987 if len == 0 {
988 return Err(Error::new(Kind::CloseSingal, "Message Read Empty Size"));
989 }
990
991 if self.in_buffer.get_ref().len() == self.in_buffer.get_ref().capacity() {
992 let mut new = Vec::with_capacity(self.in_buffer.get_ref().capacity());
994 new.extend(&self.in_buffer.get_ref()[self.in_buffer.position() as usize ..]);
995 if new.len() == new.capacity() {
996 if !self.settings.in_buffer_grow {
997 return Err(Error::new(Kind::Capacity, "Maxed out input buffer for connection."))
998 }
999 if new.capacity() == self.settings.in_buffer_capacity {
1000 return Err(Error::new(Kind::Capacity, "Maxed out input buffer for connection."))
1001 }
1002 let now_len = new.len();
1003 new.reserve(::std::cmp::min(self.settings.in_buffer_capacity, now_len * 2));
1004 self.in_buffer = Cursor::new(new);
1005 trace!("Buffered {}.", len);
1008 } else {
1009 self.in_buffer = Cursor::new(new);
1010 }
1011 }
1012
1013 Ok(Some(len))
1014 }
1015}