1use std::cmp::{min, max};
2use std::collections::VecDeque;
3use std::net::{ToSocketAddrs, SocketAddr, UdpSocket};
4use std::io::{Result, Error, ErrorKind};
5use util::{now_microseconds, ewma, Sequence};
6use packet::{Packet, PacketType, Encodable, Decodable, ExtensionType, HEADER_SIZE};
7use rand::{self, Rng};
8use time::SteadyTime;
9use time;
10use std::time::Duration;
11
12const BUF_SIZE: usize = 1500;
15const GAIN: f64 = 1.0;
16const ALLOWED_INCREASE: u32 = 1;
17const TARGET: i64 = 100_000; const MSS: u32 = 1400;
19const MIN_CWND: u32 = 2;
20const INIT_CWND: u32 = 2;
21const INITIAL_CONGESTION_TIMEOUT: u64 = 1000; const MIN_CONGESTION_TIMEOUT: u64 = 500; const MAX_CONGESTION_TIMEOUT: u64 = 60_000; const BASE_HISTORY: usize = 10; const MAX_SYN_RETRIES: u32 = 5; const MAX_RETRANSMISSION_RETRIES: u32 = 5; const PRE_SEND_TIMEOUT: u32 = 500_000;
30
31const MAX_BASE_DELAY_AGE: i64 = 60_000_000;
33
34#[derive(Debug)]
35pub enum SocketError {
36 ConnectionClosed,
37 ConnectionReset,
38 ConnectionTimedOut,
39 UserTimedOut,
40 InvalidAddress,
41 InvalidPacket,
42 InvalidReply,
43 NotConnected,
44}
45
46impl From<SocketError> for Error {
47 fn from(error: SocketError) -> Error {
48 use self::SocketError::*;
49 let (kind, message) = match error {
50 ConnectionClosed => (ErrorKind::NotConnected, "The socket is closed"),
51 ConnectionReset => {
52 (ErrorKind::ConnectionReset,
53 "Connection reset by remote peer")
54 }
55 ConnectionTimedOut | UserTimedOut => (ErrorKind::TimedOut, "Connection timed out"),
56 InvalidAddress => (ErrorKind::InvalidInput, "Invalid address"),
57 InvalidPacket => (ErrorKind::Other, "Error parsing packet"),
58 InvalidReply => {
59 (ErrorKind::ConnectionRefused,
60 "The remote peer sent an invalid reply")
61 }
62 NotConnected => (ErrorKind::NotConnected, "The socket is not connected"),
63 };
64 Error::new(kind, message)
65 }
66}
67
68#[derive(PartialEq, Eq, Debug, Copy, Clone)]
69enum SocketState {
70 New,
71 Connected,
72 SynSent,
73 FinSent,
74 ResetReceived,
75 Closed,
76}
77
78struct DelayDifferenceSample {
79 received_at: i64,
80 difference: i64,
81}
82
83fn take_address<A: ToSocketAddrs>(addr: A) -> Result<SocketAddr> {
85 addr.to_socket_addrs()
86 .and_then(|mut it| it.next().ok_or(From::from(SocketError::InvalidAddress)))
87}
88
89fn unsafe_copy(src: &[u8], dst: &mut [u8]) -> usize {
90 let max_len = min(src.len(), dst.len());
91 unsafe {
92 use std::ptr::copy;
93 copy(src.as_ptr(), dst.as_mut_ptr(), max_len);
94 }
95 max_len
96}
97
98pub struct UtpSocket {
128 socket: UdpSocket,
130
131 connected_to: SocketAddr,
133
134 sender_connection_id: u16,
136
137 receiver_connection_id: u16,
139
140 seq_nr: u16,
142
143 ack_nr: u16,
145
146 state: SocketState,
148
149 incoming_buffer: Vec<Packet>,
151
152 send_window: Vec<Packet>,
154
155 unsent_queue: VecDeque<Packet>,
157
158 duplicate_ack_count: u32,
160
161 last_acked: u16,
163
164 last_acked_timestamp: u32,
166
167 last_dropped: u16,
169
170 rtt: i32,
172
173 rtt_variance: i32,
175
176 pending_data: VecDeque<u8>,
178
179 read_ready_data: VecDeque<u8>,
182
183 curr_window: u32,
185
186 remote_wnd_size: u32,
188
189 base_delays: VecDeque<i64>,
191
192 current_delays: Vec<DelayDifferenceSample>,
194
195 their_delay: u32,
197
198 last_rollover: i64,
200
201 congestion_timeout: u64,
203
204 cwnd: u32,
206
207 pub max_retransmission_retries: u32,
209
210 user_read_timeout: u64,
212
213 last_congestion_update: SteadyTime,
215
216 retries: u32,
217
218 state_packet: Option<Packet>,
221
222 last_msg_sent_timestamp: SteadyTime,
224}
225
226impl UtpSocket {
227 fn from_raw_parts(s: UdpSocket, src: SocketAddr) -> UtpSocket {
231 let (receiver_id, sender_id) =
236 || -> (u16, u16) {
237 let mut rng = rand::thread_rng();
238 loop {
239 let id = rng.gen::<u16>();
240 if id.checked_add(1).is_some() {
241 return (id, id + 1);
242 }
243 }
244 }();
245
246 UtpSocket {
247 socket: s,
248 connected_to: src,
249 receiver_connection_id: receiver_id,
250 sender_connection_id: sender_id,
251 seq_nr: 1,
252 ack_nr: 0,
253 state: SocketState::New,
254 incoming_buffer: Vec::new(),
255 send_window: Vec::new(),
256 unsent_queue: VecDeque::new(),
257 duplicate_ack_count: 0,
258 last_acked: 0,
259 last_acked_timestamp: 0,
260 last_dropped: 0,
261 rtt: 0,
262 rtt_variance: 0,
263 read_ready_data: VecDeque::new(),
264 pending_data: VecDeque::new(),
265 curr_window: 0,
266 remote_wnd_size: 0,
267 current_delays: Vec::new(),
268 base_delays: VecDeque::with_capacity(BASE_HISTORY),
269 their_delay: 0,
270 last_rollover: 0,
271 congestion_timeout: INITIAL_CONGESTION_TIMEOUT,
272 cwnd: INIT_CWND * MSS,
273 max_retransmission_retries: MAX_RETRANSMISSION_RETRIES,
274 user_read_timeout: 0,
275 last_congestion_update: SteadyTime::now(),
276 retries: 0,
277 state_packet: None,
278 last_msg_sent_timestamp: SteadyTime::now(),
279 }
280 }
281
282 pub fn bind_with_udp_socket(socket: UdpSocket) -> Result<UtpSocket> {
284 socket.local_addr().map(|a| UtpSocket::from_raw_parts(socket, a))
285 }
286
287 pub fn bind<A: ToSocketAddrs>(addr: A) -> Result<UtpSocket> {
294 take_address(addr).and_then(|a| UdpSocket::bind(a).map(|s| UtpSocket::from_raw_parts(s, a)))
295 }
296
297 pub fn local_addr(&self) -> Result<SocketAddr> {
299 self.socket.local_addr()
300 }
301
302 pub fn peer_addr(&self) -> Result<SocketAddr> {
304 if self.state == SocketState::Connected || self.state == SocketState::FinSent {
305 Ok(self.connected_to)
306 } else {
307 Err(Error::from(SocketError::NotConnected))
308 }
309 }
310
311 pub fn connect<A: ToSocketAddrs>(other: A) -> Result<UtpSocket> {
318 let addr = try!(take_address(other));
319 let my_addr = match addr {
320 SocketAddr::V4(_) => "0.0.0.0:0",
321 SocketAddr::V6(_) => ":::0",
322 };
323 let mut socket = try!(UtpSocket::bind(my_addr));
324 socket.connected_to = addr;
325
326 let mut packet = Packet::new();
327 packet.set_type(PacketType::Syn);
328 packet.set_connection_id(socket.receiver_connection_id);
329 packet.set_seq_nr(socket.seq_nr);
330
331 let mut buf = [0; BUF_SIZE];
332 let mut syn_timeout = socket.congestion_timeout;
333 let mut syn_retries = 0;
334
335 while syn_retries < MAX_SYN_RETRIES {
336 packet.set_timestamp_microseconds(now_microseconds());
337
338 debug!("Connecting to {}", socket.connected_to);
340 try!(socket.socket.send_to(&packet.to_bytes()[..], socket.connected_to));
341 socket.state = SocketState::SynSent;
342 debug!("sent {:?}", packet);
343
344 socket.socket
346 .set_read_timeout(Some(Duration::from_millis(syn_timeout)))
347 .expect("Error setting read timeout");
348 match socket.socket.recv_from(&mut buf) {
349 Ok((read, addr)) => {
350 let packet = try!(Packet::from_bytes(&buf[..read]).or(Err(SocketError::InvalidPacket)));
351
352 socket.connected_to = addr;
353
354 if packet.get_type() != PacketType::State {
355 syn_retries += 1;
358 continue;
359 }
360
361 try!(socket.handle_packet(&packet, addr));
362
363 return Ok(socket);
364 },
365 Err(ref e) if (e.kind() == ErrorKind::WouldBlock ||
366 e.kind() == ErrorKind::TimedOut) => {
367 debug!("Timed out, retrying");
368 syn_timeout *= 2;
369 syn_retries += 1;
370 continue;
371 }
372 Err(e) => return Err(e),
373 };
374 }
375
376 Err(Error::from(SocketError::ConnectionTimedOut))
377 }
378
379 pub fn rendezvous_connect<A: ToSocketAddrs>(udp_socket: UdpSocket,
390 other: A)
391 -> Result<UtpSocket> {
392 let addr = try!(take_address(other));
393 let mut socket = try!(UtpSocket::bind_with_udp_socket(udp_socket));
394 socket.rendezvous_connect_to(addr).map(|_| socket)
395 }
396
397 fn rendezvous_connect_to(&mut self, addr: SocketAddr) -> Result<()> {
398 self.connected_to = addr;
399
400 let mut packet = Packet::new();
401 packet.set_type(PacketType::Syn);
402 packet.set_connection_id(self.receiver_connection_id);
403 packet.set_seq_nr(self.seq_nr);
404
405 let mut buf = [0; BUF_SIZE];
406
407 let mut syn_timeout = self.congestion_timeout;
408 let mut retry_count = 0;
409
410 let mut rx_syn: Option<Packet> = None;
411 let mut rx_state: Option<Packet> = None;
412
413 while retry_count < MAX_SYN_RETRIES {
414 packet.set_timestamp_microseconds(now_microseconds());
415
416 debug!("Connecting to {}", self.connected_to);
418 try!(self.socket.send_to(&packet.to_bytes()[..], self.connected_to));
419 self.last_msg_sent_timestamp = SteadyTime::now();
420 self.state = SocketState::SynSent;
421 debug!("sent {:?}", packet);
422
423 try!(self.socket.set_read_timeout(Some(Duration::from_millis(syn_timeout))));
424
425 match self.socket.recv_from(&mut buf) {
427 Ok((read, src)) => {
428 let mut packet = match Packet::from_bytes(&buf[..read]) {
429 Ok(packet) => packet,
430 Err(_) => {
431 continue;
432 }
433 };
434
435 let cid = min(self.receiver_connection_id, packet.connection_id());
436
437 packet.set_connection_id(cid);
438
439 match packet.get_type() {
444 PacketType::Syn => {
445 self.receiver_connection_id = cid;
446 self.sender_connection_id = cid + 1;
447
448 let reply = self.prepare_reply(&packet, PacketType::State);
449 try!(self.socket.send_to(&reply.to_bytes()[..], self.connected_to));
450 self.last_msg_sent_timestamp = SteadyTime::now();
451
452 rx_syn = Some(packet);
453 }
454 PacketType::State => {
455 self.receiver_connection_id = cid;
456 self.sender_connection_id = cid + 1;
457
458 rx_state = Some(packet);
459 }
460 _ => continue,
461 }
462
463 match (&rx_syn, &rx_state) {
464 (&Some(ref _syn), &Some(ref state)) => {
465 try!(self.handle_packet(state, src));
466 return Ok(());
467 }
468 _ => continue,
469 }
470 }
471 Err(ref e) if (e.kind() == ErrorKind::WouldBlock ||
472 e.kind() == ErrorKind::TimedOut) => {
473 debug!("Timed out, retrying");
474 syn_timeout *= 2;
475 retry_count += 1;
476 continue;
477 }
478 Err(e) => return Err(e),
479 };
480 }
481
482 Err(Error::from(SocketError::ConnectionTimedOut))
483 }
484
485 pub fn close(&mut self) -> Result<()> {
490 if self.state == SocketState::Closed || self.state == SocketState::New ||
492 self.state == SocketState::SynSent {
493 return Ok(());
494 }
495
496 try!(self.flush());
498
499 let mut packet = Packet::new();
500 packet.set_connection_id(self.sender_connection_id);
501 packet.set_seq_nr(self.seq_nr);
502 packet.set_ack_nr(self.ack_nr);
503 packet.set_timestamp_microseconds(now_microseconds());
504 packet.set_type(PacketType::Fin);
505
506 try!(self.socket.send_to(&packet.to_bytes()[..], self.connected_to));
508 self.last_msg_sent_timestamp = SteadyTime::now();
509 debug!("sent {:?}", packet);
510 self.state = SocketState::FinSent;
511
512 let mut buf = [0; BUF_SIZE];
514 while self.state != SocketState::Closed {
515 try!(self.recv(&mut buf, false));
516 }
517
518 Ok(())
519 }
520
521 pub fn recv_from(&mut self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
527 let read = {
528 let (read_ready_0, read_ready_1) = self.read_ready_data.as_slices();
529 let mut read = 0;
530 if read_ready_0.len() > 0 {
531 read = unsafe_copy(read_ready_0, buf)
532 }
533 if read_ready_1.len() > 0 {
534 read += unsafe_copy(read_ready_1, &mut buf[read..])
535 }
536 read
537 };
538 if read > 0 {
539 self.read_ready_data.drain(..read);
540 return Ok((read, self.connected_to));
541 }
542
543 let read = self.flush_incoming_buffer(buf);
544
545 if read > 0 {
546 Ok((read, self.connected_to))
547 } else {
548 if self.state == SocketState::ResetReceived {
551 return Err(Error::from(SocketError::ConnectionReset));
552 }
553
554 loop {
555 if self.state == SocketState::Closed {
557 return Ok((0, self.connected_to));
558 }
559
560 match self.recv(buf, true) {
561 Ok((0, _src)) => continue,
562 Ok(x) => return Ok(x),
563 Err(e) => return Err(e),
564 }
565 }
566 }
567 }
568
569 pub fn set_read_timeout(&mut self, user_timeout: Option<u64>) {
572 self.user_read_timeout = match user_timeout {
573 Some(t) => {
574 if t > 0 {
575 t
576 } else {
577 0
578 }
579 }
580 None => 0,
581 }
582 }
583
584 #[cfg(windows)]
585 fn ignore_udp_error(e: &Error) -> bool {
586 const WSAECONNRESET: i32 = 10054;
607 const WSAEMSGSIZE: i32 = 10040;
608 match e.raw_os_error() {
609 Some(e) => match e {
610 WSAECONNRESET | WSAEMSGSIZE => true,
611 _ => false,
612 },
613 None => false,
614 }
615 }
616
617 #[cfg(not(windows))]
618 fn ignore_udp_error(_: &Error) -> bool {
619 false
620 }
621
622 fn recv(&mut self, buf: &mut [u8], use_user_timeout: bool) -> Result<(usize, SocketAddr)> {
623 let mut b = [0; BUF_SIZE + HEADER_SIZE];
624 let now = SteadyTime::now();
625 let (read, src);
626 let user_timeout = if use_user_timeout {
627 self.user_read_timeout
628 } else {
629 0
630 };
631 let use_user_timeout = user_timeout != 0;
632
633 loop {
635 if self.retries >= self.max_retransmission_retries {
637 debug!("exceeds max_retransmission_retries : {} ; current connect state is : {:?}",
638 self.max_retransmission_retries,
639 self.state);
640 self.state = SocketState::Closed;
641 debug!("socket marked as closed from {:?} to {:?}",
642 self.local_addr(),
643 self.connected_to);
644 return Err(Error::from(SocketError::ConnectionTimedOut));
645 }
646
647 let timeout;
648 let congestion_timeout = if self.state != SocketState::New {
649 debug!("setting read timeout of {} ms", self.congestion_timeout);
650 Some(Duration::from_millis(self.congestion_timeout))
651 } else {
652 None
653 };
654 {
655 let user_timeout = Duration::from_millis(user_timeout);
656 timeout = if use_user_timeout {
657 match congestion_timeout {
658 Some(congestion_timeout) => {
659 use std::cmp::min;
660 Some(min(congestion_timeout, user_timeout))
661 }
662 None => Some(user_timeout),
663 }
664 } else {
665 congestion_timeout
666 };
667 }
668
669 if use_user_timeout {
670 let user_timeout = time::Duration::milliseconds(user_timeout as i64);
671 if (SteadyTime::now() - now) >= user_timeout {
672 return Err(Error::from(SocketError::UserTimedOut));
673 }
674 }
675
676 self.socket.set_read_timeout(timeout).expect("Error setting read timeout");
677 match self.socket.recv_from(&mut b) {
678 Ok((r, s)) => {
679 read = r;
680 src = s;
681 break;
682 }
683 Err(ref e) if (e.kind() == ErrorKind::WouldBlock ||
684 e.kind() == ErrorKind::TimedOut) => {
685 debug!("recv_from timed out");
686 let now = SteadyTime::now();
687 let congestion_timeout = {
688 time::Duration::milliseconds(self.congestion_timeout as i64)
689 };
690 if !use_user_timeout ||
691 ((now - self.last_congestion_update) >= congestion_timeout) {
692 self.last_congestion_update = now;
693 try!(self.handle_receive_timeout());
694 self.retries += 1;
695 }
696 }
697 Err(ref e) if Self::ignore_udp_error(e) => (),
698 Err(e) => return Err(e),
699 };
700
701 let elapsed = (SteadyTime::now() - now).num_milliseconds();
702 debug!("{} ms elapsed", elapsed);
703 }
704
705 self.last_congestion_update = SteadyTime::now();
706 self.retries = 0;
707
708 let packet = match Packet::from_bytes(&b[..read]) {
710 Ok(packet) => packet,
711 Err(e) => {
712 debug!("{}", e);
713 debug!("Ignoring invalid packet");
714 return Ok((0, self.connected_to));
715 }
716 };
717 debug!("received {:?}", packet);
718
719 if let Some(mut pkt) = try!(self.handle_packet(&packet, src)) {
721 pkt.set_wnd_size(BUF_SIZE as u32);
722 try!(self.socket.send_to(&pkt.to_bytes()[..], src));
723 self.last_msg_sent_timestamp = SteadyTime::now();
724 debug!("sent {:?}", pkt);
725 }
726
727 if packet.get_type() == PacketType::Data {
730 if Sequence::less(self.last_dropped, packet.seq_nr()) {
731 self.insert_into_buffer(packet);
732 }
733 }
734
735 let read = self.flush_incoming_buffer(buf);
737
738 Ok((read, src))
739 }
740
741 fn handle_receive_timeout(&mut self) -> Result<()> {
742 self.congestion_timeout *= 2;
743 self.cwnd = MSS;
744
745 debug!("self.send_window: {:?}",
755 self.send_window
756 .iter()
757 .map(Packet::seq_nr)
758 .collect::<Vec<u16>>());
759
760 if self.send_window.is_empty() {
761 if self.state == SocketState::FinSent {
764 let mut packet = Packet::new();
765 packet.set_connection_id(self.sender_connection_id);
766 packet.set_seq_nr(self.seq_nr);
767 packet.set_ack_nr(self.ack_nr);
768 packet.set_timestamp_microseconds(now_microseconds());
769 packet.set_type(PacketType::Fin);
770
771 try!(self.socket.send_to(&packet.to_bytes()[..], self.connected_to));
773 self.last_msg_sent_timestamp = SteadyTime::now();
774 debug!("resent FIN: {:?}", packet);
775 } else if self.state != SocketState::New {
776 debug!("sending fast resend request");
779 self.send_fast_resend_request();
780 }
781 } else {
782 let mut packet = &mut self.send_window[0];
785 packet.set_timestamp_microseconds(now_microseconds());
786 try!(self.socket.send_to(&packet.to_bytes()[..], self.connected_to));
787 self.last_msg_sent_timestamp = SteadyTime::now();
788 debug!("resent {:?}", packet);
789 }
790
791 Ok(())
792 }
793
794 fn prepare_reply(&self, original: &Packet, t: PacketType) -> Packet {
795 let mut resp = Packet::new();
796 resp.set_type(t);
797 let self_t_micro: u32 = now_microseconds();
798 let other_t_micro: u32 = original.timestamp_microseconds();
799 resp.set_timestamp_microseconds(self_t_micro);
800 resp.set_timestamp_difference_microseconds(self_t_micro.wrapping_sub(other_t_micro));
801 resp.set_connection_id(self.sender_connection_id);
802 resp.set_seq_nr(self.seq_nr);
803 resp.set_ack_nr(self.ack_nr);
804
805 resp
806 }
807
808 fn advance_incoming_buffer(&mut self) -> Option<Packet> {
810 if !self.incoming_buffer.is_empty() {
811 let packet = self.incoming_buffer.remove(0);
812 debug!("Removed packet from incoming buffer: {:?}", packet);
813 self.ack_nr = packet.seq_nr();
814 self.last_dropped = self.ack_nr;
815 Some(packet)
816 } else {
817 None
818 }
819 }
820
821 fn flush_incoming_buffer(&mut self, buf: &mut [u8]) -> usize {
827 if !self.pending_data.is_empty() {
829 let flushed = {
830 let (pending_0, pending_1) = self.pending_data.as_slices();
831 let mut flushed = 0;
832 if pending_0.len() > 0 {
833 flushed += unsafe_copy(pending_0, buf);
834 }
835 if pending_1.len() > 0 {
836 flushed += unsafe_copy(pending_1, &mut buf[flushed..]);
837 }
838 flushed
839 };
840
841 if flushed == self.pending_data.len() {
842 self.pending_data.clear();
843 self.advance_incoming_buffer();
844 } else {
845 self.pending_data.drain(..flushed);
846 }
847
848 return flushed;
849 }
850
851 if !self.incoming_buffer.is_empty() &&
852 (self.ack_nr == self.incoming_buffer[0].seq_nr() ||
853 self.ack_nr.wrapping_add(1) == self.incoming_buffer[0].seq_nr())
854 {
855 let flushed = unsafe_copy(&self.incoming_buffer[0].payload[..], buf);
856
857 if flushed == self.incoming_buffer[0].payload.len() {
858 self.advance_incoming_buffer();
859 } else {
860 self.pending_data.extend(self.incoming_buffer[0].payload.drain(flushed..));
861 }
862
863 return flushed;
864 }
865
866 0
867 }
868
869 pub fn send_to(&mut self, buf: &[u8]) -> Result<usize> {
881 if self.state == SocketState::Closed {
882 return Err(Error::from(SocketError::ConnectionClosed));
883 }
884
885 let total_length = buf.len();
886
887 for chunk in buf.chunks(MSS as usize - HEADER_SIZE) {
888 let mut packet = Packet::with_payload(chunk);
889 packet.set_seq_nr(self.seq_nr);
890 packet.set_ack_nr(self.ack_nr);
891 packet.set_connection_id(self.sender_connection_id);
892
893 self.unsent_queue.push_back(packet);
894
895 if self.seq_nr == ::std::u16::MAX {
897 self.seq_nr = 0;
898 } else {
899 self.seq_nr += 1;
900 }
901 }
902
903 try!(self.send());
905
906 Ok(total_length)
907 }
908
909 pub fn flush(&mut self) -> Result<()> {
911 let mut buf = [0u8; BUF_SIZE];
912 while !self.send_window.is_empty() {
913 debug!("packets in send window: {}", self.send_window.len());
914 try!(self.recv(&mut buf, false));
915 }
916
917 Ok(())
918 }
919
920 fn send_state(&mut self) {
921 let mut packet = Packet::new();
922 packet.set_type(PacketType::State);
923 let self_t_micro: u32 = now_microseconds();
924 packet.set_timestamp_microseconds(self_t_micro);
925 packet.set_timestamp_difference_microseconds(self.their_delay);
926 packet.set_connection_id(self.sender_connection_id);
927 packet.set_seq_nr(self.seq_nr);
928 packet.set_ack_nr(self.ack_nr);
929 let _ = self.socket.send_to(&packet.to_bytes()[..], self.connected_to);
930 self.last_msg_sent_timestamp = SteadyTime::now();
931 }
932
933 pub fn send_keepalive(&mut self) {
935 if (SteadyTime::now() - self.last_msg_sent_timestamp).num_milliseconds()
936 >= 14_000 {
937 self.send_state();
938 }
939 }
940
941 fn send(&mut self) -> Result<()> {
943 while let Some(mut packet) = self.unsent_queue.pop_front() {
944 try!(self.send_packet(&mut packet));
945 self.curr_window += packet.len() as u32;
946 self.send_window.push(packet);
947 }
948 Ok(())
949 }
950
951 #[inline]
953 fn send_packet(&mut self, packet: &mut Packet) -> Result<()> {
954 debug!("current window: {}", self.send_window.len());
955 let max_inflight = min(self.cwnd, self.remote_wnd_size);
956 let max_inflight = max(MIN_CWND * MSS, max_inflight);
957 let now = now_microseconds();
958
959 while self.curr_window >= max_inflight && now_microseconds() - now < PRE_SEND_TIMEOUT {
962 debug!("self.curr_window: {}", self.curr_window);
963 debug!("max_inflight: {}", max_inflight);
964 debug!("self.duplicate_ack_count: {}", self.duplicate_ack_count);
965 debug!("now_microseconds() - now = {}", now_microseconds() - now);
966 let mut buf = [0; BUF_SIZE];
967 let (read, _) = try!(self.recv(&mut buf, false));
968 self.read_ready_data.extend(&buf[..read]);
969 }
970 debug!("out: now_microseconds() - now = {}",
971 now_microseconds() - now);
972
973 let distance_a = packet.seq_nr().wrapping_sub(self.last_acked);
978 let distance_b = self.last_acked.wrapping_sub(packet.seq_nr());
979 if distance_a > distance_b {
980 debug!("Packet already acknowledged, skipping...");
981 return Ok(());
982 }
983
984 packet.set_timestamp_microseconds(now_microseconds());
985 packet.set_timestamp_difference_microseconds(self.their_delay);
986 try!(self.socket.send_to(&packet.to_bytes()[..], self.connected_to));
987 self.last_msg_sent_timestamp = SteadyTime::now();
988 debug!("sent {:?}", packet);
989
990 Ok(())
991 }
992
993 fn update_base_delay(&mut self, base_delay: i64, now: i64) {
998 if self.base_delays.is_empty() || now - self.last_rollover > MAX_BASE_DELAY_AGE {
999 self.last_rollover = now;
1001
1002 if self.base_delays.len() == BASE_HISTORY {
1004 self.base_delays.pop_front();
1005 }
1006
1007 self.base_delays.push_back(base_delay);
1009 } else {
1010 let last_idx = self.base_delays.len() - 1;
1012 if base_delay < self.base_delays[last_idx] {
1013 self.base_delays[last_idx] = base_delay;
1014 }
1015 }
1016 }
1017
1018 fn update_current_delay(&mut self, v: i64, now: i64) {
1021 let rtt = self.rtt as i64 * 100;
1023 while !self.current_delays.is_empty() && now - self.current_delays[0].received_at > rtt {
1024 self.current_delays.remove(0);
1025 }
1026
1027 self.current_delays.push(DelayDifferenceSample {
1029 received_at: now,
1030 difference: v,
1031 });
1032 }
1033
1034 fn update_congestion_timeout(&mut self, current_delay: i32) {
1035 let delta = self.rtt - current_delay;
1036 self.rtt_variance += (delta.abs() - self.rtt_variance) / 4;
1037 self.rtt += (current_delay - self.rtt) / 8;
1038 self.congestion_timeout = max((self.rtt + self.rtt_variance * 4) as u64,
1039 MIN_CONGESTION_TIMEOUT);
1040 self.congestion_timeout = min(self.congestion_timeout, MAX_CONGESTION_TIMEOUT);
1041
1042 debug!("current_delay: {}", current_delay);
1043 debug!("delta: {}", delta);
1044 debug!("self.rtt_variance: {}", self.rtt_variance);
1045 debug!("self.rtt: {}", self.rtt);
1046 debug!("self.congestion_timeout: {}", self.congestion_timeout);
1047 }
1048
1049 fn filtered_current_delay(&self) -> i64 {
1055 let input = self.current_delays.iter().map(|x| x.difference);
1056 ewma(input, 0.333) as i64
1057 }
1058
1059 fn min_base_delay(&self) -> i64 {
1061 self.base_delays.iter().min().cloned().unwrap_or(0)
1062 }
1063
1064 fn build_selective_ack(&self) -> Vec<u8> {
1066 let stashed = self.incoming_buffer
1067 .iter()
1068 .filter(|pkt| pkt.seq_nr() > self.ack_nr + 1)
1069 .map(|pkt| (pkt.seq_nr() - self.ack_nr - 2) as usize)
1070 .map(|diff| (diff / 8, diff % 8));
1071
1072 let mut sack = Vec::new();
1073 for (byte, bit) in stashed {
1074 while byte >= sack.len() || sack.len() % 4 != 0 {
1077 sack.push(0u8);
1078 }
1079
1080 sack[byte] |= 1 << bit;
1081 }
1082
1083 sack
1084 }
1085
1086 fn send_fast_resend_request(&mut self) {
1091 for _ in 0..3 {
1092 self.send_state();
1093 }
1094 }
1095
1096 fn resend_lost_packet(&mut self, lost_packet_nr: u16) {
1097 debug!("---> resend_lost_packet({}) <---", lost_packet_nr);
1098 match self.send_window.iter().position(|pkt| pkt.seq_nr() == lost_packet_nr) {
1099 None => debug!("Packet {} not found", lost_packet_nr),
1100 Some(position) => {
1101 debug!("self.send_window.len(): {}", self.send_window.len());
1102 debug!("position: {}", position);
1103 let mut packet = self.send_window[position].clone();
1104 let _ = self.send_packet(&mut packet);
1106
1107 }
1110 }
1111 debug!("---> END resend_lost_packet <---");
1112 }
1113
1114 fn advance_send_window(&mut self) {
1116 if let Some(position) = self.send_window
1126 .iter()
1127 .position(|pkt| pkt.seq_nr() == self.last_acked) {
1128 for _ in 0..position + 1 {
1129 let packet = self.send_window.remove(0);
1130 self.curr_window -= packet.len() as u32;
1131 }
1132 }
1133 debug!("self.curr_window: {}", self.curr_window);
1134 }
1135
1136 fn handle_packet(&mut self, packet: &Packet, src: SocketAddr) -> Result<Option<Packet>> {
1140 debug!("({:?}, {:?})", self.state, packet.get_type());
1141
1142 let is_data_or_fin = packet.get_type() == PacketType::Data
1143 || packet.get_type() == PacketType::Fin;
1144
1145 if is_data_or_fin && packet.seq_nr().wrapping_sub(self.ack_nr) == 1 {
1151 self.ack_nr = packet.seq_nr();
1152 }
1153
1154 if packet.get_type() != PacketType::Syn && self.state != SocketState::SynSent &&
1156 !(packet.connection_id() == self.sender_connection_id ||
1157 packet.connection_id() == self.receiver_connection_id) {
1158 return Ok(Some(self.prepare_reply(packet, PacketType::Reset)));
1159 }
1160
1161 self.remote_wnd_size = packet.wnd_size();
1163 debug!("self.remote_wnd_size: {}", self.remote_wnd_size);
1164
1165 let now = now_microseconds();
1167 self.their_delay = now.wrapping_sub(packet.timestamp_microseconds());
1168 debug!("self.their_delay: {}", self.their_delay);
1169
1170 match (self.state, packet.get_type()) {
1171 (SocketState::New, PacketType::Syn) => {
1172 self.connected_to = src;
1173 self.ack_nr = packet.seq_nr();
1174 self.seq_nr = rand::random();
1175 self.last_acked = self.seq_nr.wrapping_sub(1);
1176 self.receiver_connection_id = packet.connection_id() + 1;
1177 self.sender_connection_id = packet.connection_id();
1178 self.state = SocketState::Connected;
1179 self.last_dropped = self.ack_nr;
1180
1181 self.state_packet = Some(self.prepare_reply(packet, PacketType::State));
1182
1183 self.seq_nr = self.seq_nr.wrapping_add(1);
1187
1188 Ok(self.state_packet.clone())
1189 }
1190 (SocketState::Connected, PacketType::Syn) if self.connected_to == src => {
1191 Ok(self.state_packet.clone())
1195 }
1196 (_, PacketType::Syn) => {
1197 Ok(Some(self.prepare_reply(packet, PacketType::Reset)))
1198 }
1199 (SocketState::SynSent, PacketType::State) => {
1200 self.connected_to = src;
1201 self.ack_nr = packet.seq_nr();
1202 self.seq_nr += 1;
1203 self.state = SocketState::Connected;
1204 self.last_acked = packet.ack_nr();
1205 self.last_dropped = packet.seq_nr();
1206 self.last_acked_timestamp = now_microseconds();
1207 Ok(None)
1208 }
1209 (SocketState::SynSent, _) => Err(Error::from(SocketError::InvalidReply)),
1210 (SocketState::Connected, PacketType::Data) |
1211 (SocketState::FinSent, PacketType::Data) => Ok(self.handle_data_packet(packet)),
1212 (SocketState::Connected, PacketType::State) => {
1213 self.handle_state_packet(packet);
1214 Ok(None)
1215 }
1216 (SocketState::Connected, PacketType::Fin) |
1217 (SocketState::FinSent, PacketType::Fin) => {
1218 if packet.ack_nr() < self.seq_nr {
1219 debug!("FIN received but there are missing acknowledgements for sent packets");
1220 }
1221 let mut reply = self.prepare_reply(packet, PacketType::State);
1222 if packet.seq_nr().wrapping_sub(self.ack_nr) > 1 {
1223 debug!("current ack_nr ({}) is behind received packet seq_nr ({})",
1224 self.ack_nr,
1225 packet.seq_nr());
1226
1227 let sack = self.build_selective_ack();
1229
1230 if sack.len() > 0 {
1231 reply.set_sack(sack);
1232 }
1233 }
1234
1235 self.state = SocketState::Closed;
1237 Ok(Some(reply))
1238 }
1239 (SocketState::FinSent, PacketType::State) => {
1240 if packet.ack_nr() == self.seq_nr {
1241 self.state = SocketState::Closed;
1242 } else {
1243 self.handle_state_packet(packet);
1244 }
1245 Ok(None)
1246 }
1247 (_, PacketType::Reset) => {
1248 self.state = SocketState::ResetReceived;
1249 Err(Error::from(SocketError::ConnectionReset))
1250 }
1251 (state, ty) => {
1252 let message = format!("Unimplemented handling for ({:?},{:?})", state, ty);
1253 debug!("{}", message);
1254 Err(Error::new(ErrorKind::Other, message))
1255 }
1256 }
1257 }
1258
1259 fn handle_data_packet(&mut self, packet: &Packet) -> Option<Packet> {
1260 let packet_type = if self.state == SocketState::FinSent {
1262 PacketType::Fin
1263 } else {
1264 PacketType::State
1265 };
1266 let mut reply = self.prepare_reply(packet, packet_type);
1267
1268 if packet.seq_nr().wrapping_sub(self.ack_nr) > 1 {
1269 debug!("current ack_nr ({}) is behind received packet seq_nr ({})",
1270 self.ack_nr,
1271 packet.seq_nr());
1272
1273 let sack = self.build_selective_ack();
1275
1276 if sack.len() > 0 {
1277 reply.set_sack(sack);
1278 }
1279 }
1280
1281 Some(reply)
1282 }
1283
1284 fn queuing_delay(&self) -> i64 {
1285 let filtered_current_delay = self.filtered_current_delay();
1286 let min_base_delay = self.min_base_delay();
1287 let queuing_delay = filtered_current_delay - min_base_delay;
1288
1289 debug!("filtered_current_delay: {}", filtered_current_delay);
1290 debug!("min_base_delay: {}", min_base_delay);
1291 debug!("queuing_delay: {}", queuing_delay);
1292
1293 queuing_delay
1294 }
1295
1296 fn update_congestion_window(&mut self, off_target: f64, bytes_newly_acked: u32) {
1316 let flightsize = self.curr_window;
1317
1318 let cwnd_increase = GAIN * off_target * bytes_newly_acked as f64 * MSS as f64;
1319 let cwnd_increase = cwnd_increase / self.cwnd as f64;
1320 debug!("cwnd_increase: {}", cwnd_increase);
1321
1322 self.cwnd = (self.cwnd as f64 + cwnd_increase) as u32;
1323 let max_allowed_cwnd = flightsize + ALLOWED_INCREASE * MSS;
1324 self.cwnd = min(self.cwnd, max_allowed_cwnd);
1325 self.cwnd = max(self.cwnd, MIN_CWND * MSS);
1326
1327 debug!("cwnd: {}", self.cwnd);
1328 debug!("max_allowed_cwnd: {}", max_allowed_cwnd);
1329 }
1330
1331 fn handle_state_packet(&mut self, packet: &Packet) {
1332 if packet.ack_nr() == self.last_acked {
1333 self.duplicate_ack_count += 1;
1334 } else {
1335 self.last_acked = packet.ack_nr();
1336 self.last_acked_timestamp = now_microseconds();
1337 self.duplicate_ack_count = 1;
1338 }
1339
1340 if let Some(index) = self.send_window.iter().position(|p| packet.ack_nr() == p.seq_nr()) {
1342 let bytes_newly_acked = self.send_window
1346 .iter()
1347 .take(index + 1)
1348 .fold(0, |acc, p| acc + p.len());
1349
1350 let now = now_microseconds() as i64;
1352 let our_delay = now - self.send_window[index].timestamp_microseconds() as i64;
1353 debug!("our_delay: {}", our_delay);
1354 self.update_base_delay(our_delay, now);
1355 self.update_current_delay(our_delay, now);
1356
1357 let off_target: f64 = (TARGET as f64 - self.queuing_delay() as f64) / TARGET as f64;
1358 debug!("off_target: {}", off_target);
1359
1360 self.update_congestion_window(off_target, bytes_newly_acked as u32);
1361
1362 let rtt = (TARGET - off_target as i64) / 1000; self.update_congestion_timeout(rtt as i32);
1365 }
1366
1367 let mut packet_loss_detected: bool = !self.send_window.is_empty() &&
1368 self.duplicate_ack_count == 3;
1369
1370 for extension in packet.extensions.iter() {
1372 if extension.get_type() == ExtensionType::SelectiveAck {
1373 if extension.iter().count_ones() >= 3 {
1376 self.resend_lost_packet(packet.ack_nr() + 1);
1377 packet_loss_detected = true;
1378 }
1379
1380 if let Some(last_seq_nr) = self.send_window.last().map(Packet::seq_nr) {
1381 for seq_nr in extension.iter()
1382 .enumerate()
1383 .filter(|&(_idx, received)| !received)
1384 .map(|(idx, _received)| {
1385 packet.ack_nr() + 2 + idx as u16
1386 })
1387 .take_while(|&seq_nr| seq_nr < last_seq_nr) {
1388 debug!("SACK: packet {} lost", seq_nr);
1389 self.resend_lost_packet(seq_nr);
1390 packet_loss_detected = true;
1391 }
1392 }
1393 } else {
1394 debug!("Unknown extension {:?}, ignoring", extension.get_type());
1395 }
1396 }
1397
1398 if !self.send_window.is_empty() && self.duplicate_ack_count == 3 &&
1402 !packet.extensions.iter().any(|ext| ext.get_type() == ExtensionType::SelectiveAck) {
1403 self.resend_lost_packet(packet.ack_nr().wrapping_add(1));
1404 }
1405
1406 if packet_loss_detected {
1408 debug!("packet loss detected, halving congestion window");
1409 self.cwnd = max(self.cwnd / 2, MIN_CWND * MSS);
1410 debug!("cwnd: {}", self.cwnd);
1411 }
1412
1413 self.advance_send_window();
1415 }
1416
1417 fn insert_into_buffer(&mut self, packet: Packet) {
1426 if self.incoming_buffer.last().map(|p| packet.seq_nr() > p.seq_nr()).unwrap_or(false) {
1429 self.incoming_buffer.push(packet);
1430 } else {
1431 let i = self.incoming_buffer.iter().filter(|p| p.seq_nr() < packet.seq_nr()).count();
1433
1434 if self.incoming_buffer.get(i).map(|p| p.seq_nr() != packet.seq_nr()).unwrap_or(true) {
1435 self.incoming_buffer.insert(i, packet);
1436 }
1437 }
1438 }
1439}
1440
1441impl Drop for UtpSocket {
1442 fn drop(&mut self) {
1443 let _ = self.close();
1444 }
1445}
1446
1447pub struct UtpListener {
1473 socket: UdpSocket,
1475}
1476
1477impl UtpListener {
1478 pub fn bind<A: ToSocketAddrs>(addr: A) -> Result<UtpListener> {
1487 UdpSocket::bind(addr).and_then(|s| Ok(UtpListener { socket: s }))
1488 }
1489
1490 pub fn accept(&self) -> Result<(UtpSocket, SocketAddr)> {
1498 let mut buf = [0; BUF_SIZE];
1499
1500 match self.socket.recv_from(&mut buf) {
1501 Ok((nread, src)) => {
1502 let packet = try!(Packet::from_bytes(&buf[..nread])
1503 .or(Err(SocketError::InvalidPacket)));
1504
1505 if packet.get_type() != PacketType::Syn {
1507 return Err(Error::from(SocketError::InvalidPacket));
1508 }
1509
1510 let inner_socket = self.socket.local_addr().and_then(|addr| {
1512 match addr {
1513 SocketAddr::V4(_) => UdpSocket::bind("0.0.0.0:0"),
1514 SocketAddr::V6(_) => UdpSocket::bind(":::0"),
1515 }
1516 });
1517
1518 let mut socket = try!(inner_socket.map(|s| UtpSocket::from_raw_parts(s, src)));
1519
1520 match socket.handle_packet(&packet, src) {
1522 Ok(Some(reply)) => try!(socket.socket.send_to(&reply.to_bytes()[..], src)),
1523 Ok(None) => return Err(Error::from(SocketError::InvalidPacket)),
1524 Err(e) => return Err(e),
1525 };
1526
1527 Ok((socket, src))
1528 }
1529 Err(e) => Err(e),
1530 }
1531 }
1532
1533 pub fn incoming(&self) -> Incoming {
1537 Incoming { listener: self }
1538 }
1539
1540 pub fn local_addr(&self) -> Result<SocketAddr> {
1542 self.socket.local_addr()
1543 }
1544}
1545
1546pub struct Incoming<'a> {
1547 listener: &'a UtpListener,
1548}
1549
1550impl<'a> Iterator for Incoming<'a> {
1551 type Item = Result<(UtpSocket, SocketAddr)>;
1552
1553 fn next(&mut self) -> Option<Result<(UtpSocket, SocketAddr)>> {
1554 Some(self.listener.accept())
1555 }
1556}
1557
1558#[cfg(test)]
1559mod test {
1560 use std::thread;
1561 use std::net::ToSocketAddrs;
1562 use std::io::ErrorKind;
1563 use super::{UtpSocket, UtpListener, SocketState, BUF_SIZE, take_address};
1564 use packet::{Packet, PacketType, Encodable, Decodable};
1565 use util::now_microseconds;
1566 use rand;
1567
1568 macro_rules! iotry {
1569 ($e:expr) => (match $e { Ok(e) => e, Err(e) => panic!("{:?}", e) })
1570 }
1571
1572 fn next_test_port() -> u16 {
1573 use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
1574 static NEXT_OFFSET: AtomicUsize = ATOMIC_USIZE_INIT;
1575 const BASE_PORT: u16 = 9600;
1576 BASE_PORT + NEXT_OFFSET.fetch_add(1, Ordering::Relaxed) as u16
1577 }
1578
1579 fn next_test_ip4<'a>() -> (&'a str, u16) {
1580 ("127.0.0.1", next_test_port())
1581 }
1582
1583 fn next_test_ip6<'a>() -> (&'a str, u16) {
1584 ("::1", next_test_port())
1585 }
1586
1587 #[test]
1588 fn test_socket_ipv4() {
1589 let server_addr = next_test_ip4();
1590
1591 let mut server = iotry!(UtpSocket::bind(server_addr));
1592 assert!(server.state == SocketState::New);
1593
1594 let child = thread::spawn(move || {
1595 let mut client = iotry!(UtpSocket::connect(server_addr));
1596 assert!(client.state == SocketState::Connected);
1597 assert_eq!(client.sender_connection_id,
1599 client.receiver_connection_id + 1);
1600 assert_eq!(client.connected_to,
1601 server_addr.to_socket_addrs().unwrap().next().unwrap());
1602 iotry!(client.close());
1603 drop(client);
1604 });
1605
1606 let mut buf = [0u8; BUF_SIZE];
1607 match server.recv_from(&mut buf) {
1608 e => println!("{:?}", e),
1609 }
1610 assert_eq!(server.receiver_connection_id,
1612 server.sender_connection_id + 1);
1613
1614 assert!(server.state == SocketState::Closed);
1615 drop(server);
1616
1617 assert!(child.join().is_ok());
1618 }
1619
1620 #[test]
1621 fn test_socket_ipv6() {
1622 let server_addr = next_test_ip6();
1623
1624 let mut server = iotry!(UtpSocket::bind(server_addr));
1625 assert!(server.state == SocketState::New);
1626
1627 let child = thread::spawn(move || {
1628 let mut client = iotry!(UtpSocket::connect(server_addr));
1629 assert!(client.state == SocketState::Connected);
1630 assert_eq!(client.sender_connection_id,
1632 client.receiver_connection_id + 1);
1633 assert_eq!(client.connected_to,
1634 server_addr.to_socket_addrs().unwrap().next().unwrap());
1635 iotry!(client.close());
1636 drop(client);
1637 });
1638
1639 let mut buf = [0u8; BUF_SIZE];
1640 match server.recv_from(&mut buf) {
1641 e => println!("{:?}", e),
1642 }
1643 assert_eq!(server.receiver_connection_id,
1645 server.sender_connection_id + 1);
1646
1647 assert!(server.state == SocketState::Closed);
1648 drop(server);
1649
1650 assert!(child.join().is_ok());
1651 }
1652
1653 #[test]
1654 fn test_rendezvous_connect() {
1655 use std::net::{UdpSocket, Ipv4Addr, SocketAddrV4};
1656
1657 let peer1_udp_socket = iotry!(UdpSocket::bind("0.0.0.0:0"));
1658 let peer2_udp_socket = iotry!(UdpSocket::bind("0.0.0.0:0"));
1659
1660 let peer1_port = iotry!(peer1_udp_socket.local_addr()).port();
1661 let peer1_addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), peer1_port);
1662
1663 let peer2_port = iotry!(peer2_udp_socket.local_addr()).port();
1664 let peer2_addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), peer2_port);
1665
1666 const BUF_LEN: u32 = 16777216;
1667
1668 let tx_buffer: Vec<u8> = (0..BUF_LEN).map(|_| rand::random::<u8>()).collect();
1669
1670 let t = thread::spawn(move || {
1671 let mut peer1 = iotry!(UtpSocket::rendezvous_connect(peer1_udp_socket, peer2_addr));
1672 let mut sent_total = 0;
1673 while sent_total < tx_buffer.len() {
1674 let chunk_size = rand::random::<u16>() as usize + 1;
1675 let slice_end = ::std::cmp::min(tx_buffer.len(), sent_total + chunk_size);
1676 let sent = peer1.send_to(&tx_buffer[sent_total..slice_end]).unwrap();
1677 sent_total += sent;
1678 }
1679 let r = peer1.flush();
1680 r.unwrap();
1681 let _ = peer1.close();
1682 tx_buffer
1683 });
1684
1685 let mut peer2 = iotry!(UtpSocket::rendezvous_connect(peer2_udp_socket, peer1_addr));
1686 let mut rx_buffer: Vec<u8> = (0..BUF_LEN).into_iter().map(|_| 0u8).collect();
1687 let mut received_total = 0;
1688 while received_total < rx_buffer.len() {
1689 let chunk_size = rand::random::<u16>() as usize + 1;
1690 ::std::thread::sleep(::std::time::Duration::from_millis(1));
1691 let slice_end = ::std::cmp::min(rx_buffer.len(), received_total + chunk_size);
1692 let (received, _) = peer2.recv_from(&mut rx_buffer[received_total..slice_end]).unwrap();
1693 received_total += received;
1694 }
1695 let tx_buffer = t.join().unwrap();
1696 assert_eq!(tx_buffer, rx_buffer);
1697 let _ = peer2.close();
1698 }
1699
1700 #[test]
1701 fn test_recvfrom_on_closed_socket() {
1702 let server_addr = next_test_ip4();
1703
1704 let mut server = iotry!(UtpSocket::bind(server_addr));
1705 assert!(server.state == SocketState::New);
1706
1707 let child = thread::spawn(move || {
1708 let mut client = iotry!(UtpSocket::connect(server_addr));
1709 assert!(client.state == SocketState::Connected);
1710 assert!(client.close().is_ok());
1711 });
1712
1713 let mut buf = [0u8; BUF_SIZE];
1715 let _resp = server.recv_from(&mut buf);
1716 assert!(server.state == SocketState::Closed);
1717
1718 match server.recv_from(&mut buf) {
1720 Ok((0, _src)) => {}
1721 e => panic!("Expected Ok(0), got {:?}", e),
1722 }
1723 assert_eq!(server.state, SocketState::Closed);
1724
1725 assert!(child.join().is_ok());
1726 }
1727
1728 #[test]
1729 fn test_sendto_on_closed_socket() {
1730 let server_addr = next_test_ip4();
1731
1732 let mut server = iotry!(UtpSocket::bind(server_addr));
1733 assert!(server.state == SocketState::New);
1734
1735 let child = thread::spawn(move || {
1736 let mut client = iotry!(UtpSocket::connect(server_addr));
1737 assert!(client.state == SocketState::Connected);
1738 iotry!(client.close());
1739 });
1740
1741 let mut buf = [0u8; BUF_SIZE];
1743 let (_read, _src) = iotry!(server.recv_from(&mut buf));
1744 assert_eq!(server.state, SocketState::Closed);
1745
1746 match server.send_to(&buf) {
1748 Err(ref e) if e.kind() == ErrorKind::NotConnected => (),
1749 v => panic!("expected {:?}, got {:?}", ErrorKind::NotConnected, v),
1750 }
1751
1752 assert!(child.join().is_ok());
1753 }
1754
1755 #[test]
1756 fn test_acks_on_socket() {
1757 use std::sync::mpsc::channel;
1758 let server_addr = next_test_ip4();
1759 let (tx, rx) = channel();
1760
1761 let mut server = iotry!(UtpSocket::bind(server_addr));
1762
1763 let child = thread::spawn(move || {
1764 let mut buf = [0u8; BUF_SIZE];
1766 let _resp = server.recv(&mut buf, false);
1767 tx.send(server.seq_nr).unwrap();
1768
1769 iotry!(server.recv_from(&mut buf));
1771
1772 drop(server);
1773 });
1774
1775 let mut client = iotry!(UtpSocket::connect(server_addr));
1776 assert!(client.state == SocketState::Connected);
1777 let sender_seq_nr = rx.recv().unwrap();
1778 let ack_nr = client.ack_nr;
1779 assert!(ack_nr != 0);
1780 assert!(ack_nr.wrapping_add(1) == sender_seq_nr);
1781 assert!(client.close().is_ok());
1782
1783 assert!(client.ack_nr == ack_nr);
1787 drop(client);
1788
1789 assert!(child.join().is_ok());
1790 }
1791
1792 #[test]
1793 fn test_handle_packet() {
1794 let initial_connection_id: u16 = rand::random();
1796 let sender_connection_id = initial_connection_id + 1;
1797 let (server_addr, client_addr) = (next_test_ip4()
1798 .to_socket_addrs()
1799 .unwrap()
1800 .next()
1801 .unwrap(),
1802 next_test_ip4()
1803 .to_socket_addrs()
1804 .unwrap()
1805 .next()
1806 .unwrap());
1807 let mut socket = iotry!(UtpSocket::bind(server_addr));
1808
1809 let mut packet = Packet::new();
1810 packet.set_wnd_size(BUF_SIZE as u32);
1811 packet.set_type(PacketType::Syn);
1812 packet.set_connection_id(initial_connection_id);
1813
1814 let response = socket.handle_packet(&packet, client_addr);
1816 assert!(response.is_ok());
1817 let response = response.unwrap();
1818 assert!(response.is_some());
1819
1820 let response = response.unwrap();
1822 assert!(response.get_type() == PacketType::State);
1823
1824 assert!(response.connection_id() == packet.connection_id());
1826
1827 assert!(response.ack_nr() == packet.seq_nr());
1829
1830 assert!(response.payload.is_empty());
1832 let old_packet = packet;
1838 let old_response = response;
1839
1840 let mut packet = Packet::new();
1841 packet.set_type(PacketType::Data);
1842 packet.set_connection_id(sender_connection_id);
1843 packet.set_seq_nr(old_packet.seq_nr() + 1);
1844 packet.set_ack_nr(old_response.seq_nr());
1845
1846 let response = socket.handle_packet(&packet, client_addr);
1847 assert!(response.is_ok());
1848 let response = response.unwrap();
1849 assert!(response.is_some());
1850
1851 let response = response.unwrap();
1852 assert!(response.get_type() == PacketType::State);
1853
1854 assert!(response.connection_id() == initial_connection_id);
1858 assert!(response.connection_id() == packet.connection_id() - 1);
1859
1860 assert!(response.ack_nr() == packet.seq_nr());
1862
1863 assert!(response.payload.is_empty());
1868 assert!(response.seq_nr() == old_response.seq_nr().wrapping_add(1));
1869 let old_packet = packet;
1873 let old_response = response;
1874
1875 let mut packet = Packet::new();
1876 packet.set_type(PacketType::Fin);
1877 packet.set_connection_id(sender_connection_id);
1878 packet.set_seq_nr(old_packet.seq_nr() + 1);
1879 packet.set_ack_nr(old_response.seq_nr());
1880
1881 let response = socket.handle_packet(&packet, client_addr);
1882 assert!(response.is_ok());
1883 let response = response.unwrap();
1884 assert!(response.is_some());
1885
1886 let response = response.unwrap();
1887
1888 assert!(response.get_type() == PacketType::State);
1889
1890 assert!(packet.seq_nr() == old_packet.seq_nr() + 1);
1892
1893 assert!(response.seq_nr() == old_response.seq_nr());
1895
1896 assert!(response.ack_nr() == packet.seq_nr());
1898
1899 }
1901
1902 #[test]
1903 fn test_response_to_keepalive_ack() {
1904 let initial_connection_id: u16 = rand::random();
1906 let (server_addr, client_addr) = (next_test_ip4()
1907 .to_socket_addrs()
1908 .unwrap()
1909 .next()
1910 .unwrap(),
1911 next_test_ip4()
1912 .to_socket_addrs()
1913 .unwrap()
1914 .next()
1915 .unwrap());
1916 let mut socket = iotry!(UtpSocket::bind(server_addr));
1917
1918 let mut packet = Packet::new();
1920 packet.set_wnd_size(BUF_SIZE as u32);
1921 packet.set_type(PacketType::Syn);
1922 packet.set_connection_id(initial_connection_id);
1923
1924 let response = socket.handle_packet(&packet, client_addr);
1925 assert!(response.is_ok());
1926 let response = response.unwrap();
1927 assert!(response.is_some());
1928 let response = response.unwrap();
1929 assert!(response.get_type() == PacketType::State);
1930
1931 let old_packet = packet;
1932 let old_response = response;
1933
1934 let mut packet = Packet::new();
1936 packet.set_wnd_size(BUF_SIZE as u32);
1937 packet.set_type(PacketType::State);
1938 packet.set_connection_id(initial_connection_id);
1939 packet.set_seq_nr(old_packet.seq_nr() + 1);
1940 packet.set_ack_nr(old_response.seq_nr());
1941
1942 let response = socket.handle_packet(&packet, client_addr);
1943 assert!(response.is_ok());
1944 let response = response.unwrap();
1945 assert!(response.is_none());
1946
1947 let response = socket.handle_packet(&packet, client_addr);
1949 assert!(response.is_ok());
1950 let response = response.unwrap();
1951 assert!(response.is_none());
1952
1953 socket.state = SocketState::Closed;
1955 }
1956
1957 #[test]
1958 fn test_response_to_wrong_connection_id() {
1959 let initial_connection_id: u16 = rand::random();
1961 let (server_addr, client_addr) = (next_test_ip4()
1962 .to_socket_addrs()
1963 .unwrap()
1964 .next()
1965 .unwrap(),
1966 next_test_ip4()
1967 .to_socket_addrs()
1968 .unwrap()
1969 .next()
1970 .unwrap());
1971 let mut socket = iotry!(UtpSocket::bind(server_addr));
1972
1973 let mut packet = Packet::new();
1975 packet.set_wnd_size(BUF_SIZE as u32);
1976 packet.set_type(PacketType::Syn);
1977 packet.set_connection_id(initial_connection_id);
1978
1979 let response = socket.handle_packet(&packet, client_addr);
1980 assert!(response.is_ok());
1981 let response = response.unwrap();
1982 assert!(response.is_some());
1983 assert!(response.unwrap().get_type() == PacketType::State);
1984
1985 let new_connection_id = initial_connection_id.wrapping_mul(2);
1987
1988 let mut packet = Packet::new();
1989 packet.set_wnd_size(BUF_SIZE as u32);
1990 packet.set_type(PacketType::State);
1991 packet.set_connection_id(new_connection_id);
1992
1993 let response = socket.handle_packet(&packet, client_addr);
1994 assert!(response.is_ok());
1995 let response = response.unwrap();
1996 assert!(response.is_some());
1997
1998 let response = response.unwrap();
1999 assert!(response.get_type() == PacketType::Reset);
2000 assert!(response.ack_nr() == packet.seq_nr());
2001
2002 socket.state = SocketState::Closed;
2004 }
2005
2006 #[test]
2007 fn test_unordered_packets() {
2008 let initial_connection_id: u16 = rand::random();
2010 let (server_addr, client_addr) = (next_test_ip4()
2011 .to_socket_addrs()
2012 .unwrap()
2013 .next()
2014 .unwrap(),
2015 next_test_ip4()
2016 .to_socket_addrs()
2017 .unwrap()
2018 .next()
2019 .unwrap());
2020 let mut socket = iotry!(UtpSocket::bind(server_addr));
2021
2022 let mut packet = Packet::new();
2024 packet.set_wnd_size(BUF_SIZE as u32);
2025 packet.set_type(PacketType::Syn);
2026 packet.set_connection_id(initial_connection_id);
2027
2028 let response = socket.handle_packet(&packet, client_addr);
2029 assert!(response.is_ok());
2030 let response = response.unwrap();
2031 assert!(response.is_some());
2032 let response = response.unwrap();
2033 assert!(response.get_type() == PacketType::State);
2034
2035 let old_packet = packet;
2036 let old_response = response;
2037
2038 let mut window: Vec<Packet> = Vec::new();
2039
2040 let mut packet = Packet::new();
2042 packet.set_wnd_size(BUF_SIZE as u32);
2043 packet.set_type(PacketType::Data);
2044 packet.set_connection_id(initial_connection_id);
2045 packet.set_seq_nr(old_packet.seq_nr() + 1);
2046 packet.set_ack_nr(old_response.seq_nr());
2047 packet.payload = vec![1, 2, 3];
2048 window.push(packet);
2049
2050 let mut packet = Packet::new();
2051 packet.set_wnd_size(BUF_SIZE as u32);
2052 packet.set_type(PacketType::Data);
2053 packet.set_connection_id(initial_connection_id);
2054 packet.set_seq_nr(old_packet.seq_nr() + 2);
2055 packet.set_ack_nr(old_response.seq_nr());
2056 packet.payload = vec![4, 5, 6];
2057 window.push(packet);
2058
2059 let response = socket.handle_packet(&window[1], client_addr);
2061 assert!(response.is_ok());
2062 let response = response.unwrap();
2063 assert!(response.is_some());
2064 let response = response.unwrap();
2065 assert!(response.ack_nr() != window[1].seq_nr());
2066
2067 let response = socket.handle_packet(&window[0], client_addr);
2068 assert!(response.is_ok());
2069 let response = response.unwrap();
2070 assert!(response.is_some());
2071
2072 socket.state = SocketState::Closed;
2074 }
2075
2076 #[test]
2077 fn test_socket_unordered_packets() {
2078 let server_addr = next_test_ip4();
2079
2080 let mut server = iotry!(UtpSocket::bind(server_addr));
2081 assert!(server.state == SocketState::New);
2082
2083 let child = thread::spawn(move || {
2084 let mut client = iotry!(UtpSocket::connect(server_addr));
2085 assert!(client.state == SocketState::Connected);
2086 assert_eq!(client.sender_connection_id,
2088 client.receiver_connection_id + 1);
2089 let s = client.socket.try_clone().ok().expect("Error cloning internal UDP socket");
2090 let mut window: Vec<Packet> = Vec::new();
2091
2092 for data in (1..13u8).collect::<Vec<u8>>()[..].chunks(3) {
2093 let mut packet = Packet::new();
2094 packet.set_wnd_size(BUF_SIZE as u32);
2095 packet.set_type(PacketType::Data);
2096 packet.set_connection_id(client.sender_connection_id);
2097 packet.set_seq_nr(client.seq_nr);
2098 packet.set_ack_nr(client.ack_nr);
2099 packet.payload = data.to_vec();
2100 window.push(packet.clone());
2101 client.send_window.push(packet.clone());
2102 client.seq_nr += 1;
2103 client.curr_window += packet.len() as u32;
2104 }
2105
2106 let mut packet = Packet::new();
2107 packet.set_wnd_size(BUF_SIZE as u32);
2108 packet.set_type(PacketType::Fin);
2109 packet.set_connection_id(client.sender_connection_id);
2110 packet.set_seq_nr(client.seq_nr);
2111 packet.set_ack_nr(client.ack_nr);
2112 window.push(packet);
2113 client.seq_nr += 1;
2114
2115 iotry!(s.send_to(&window[3].to_bytes()[..], server_addr));
2116 iotry!(s.send_to(&window[2].to_bytes()[..], server_addr));
2117 iotry!(s.send_to(&window[1].to_bytes()[..], server_addr));
2118 iotry!(s.send_to(&window[0].to_bytes()[..], server_addr));
2119 iotry!(s.send_to(&window[4].to_bytes()[..], server_addr));
2120
2121 for _ in 0u8..2 {
2122 let mut buf = [0; BUF_SIZE];
2123 iotry!(s.recv_from(&mut buf));
2124 }
2125 });
2126
2127 let mut buf = [0; BUF_SIZE];
2128 let expected: Vec<u8> = (1..13u8).collect();
2129 let mut received: Vec<u8> = vec![];
2130 loop {
2131 match server.recv_from(&mut buf) {
2132 Ok((0, _src)) => break,
2133 Ok((len, _src)) => received.extend(buf[..len].to_vec()),
2134 Err(e) => panic!("{:?}", e),
2135 }
2136 }
2137
2138 assert_eq!(server.receiver_connection_id,
2140 server.sender_connection_id + 1);
2141 assert_eq!(server.state, SocketState::Closed);
2142 assert_eq!(received.len(), expected.len());
2143 assert_eq!(received, expected);
2144
2145 assert!(child.join().is_ok());
2146 }
2147
2148 #[test]
2149 fn test_response_to_triple_ack() {
2150 let server_addr = next_test_ip4();
2151 let mut server = iotry!(UtpSocket::bind(server_addr));
2152
2153 const LEN: usize = 1024;
2155 let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2156 let d = data.clone();
2157 assert_eq!(LEN, data.len());
2158
2159 let child = thread::spawn(move || {
2160 let mut client = iotry!(UtpSocket::connect(server_addr));
2161 iotry!(client.send_to(&d[..]));
2162 iotry!(client.close());
2163 });
2164
2165 let mut buf = [0; BUF_SIZE];
2166 iotry!(server.recv(&mut buf, false));
2168
2169 let data_packet = match server.socket.recv_from(&mut buf) {
2171 Ok((read, _src)) => iotry!(Packet::from_bytes(&buf[..read])),
2172 Err(e) => panic!("{}", e),
2173 };
2174 assert_eq!(data_packet.get_type(), PacketType::Data);
2175 assert_eq!(data_packet.payload, data);
2176 assert_eq!(data_packet.payload.len(), data.len());
2177
2178 let mut packet = Packet::new();
2180 packet.set_wnd_size(BUF_SIZE as u32);
2181 packet.set_type(PacketType::State);
2182 packet.set_seq_nr(server.seq_nr);
2183 packet.set_ack_nr(data_packet.seq_nr() - 1);
2184 packet.set_connection_id(server.sender_connection_id);
2185
2186 for _ in 0u8..3 {
2187 iotry!(server.socket.send_to(&packet.to_bytes()[..], server.connected_to));
2188 }
2189
2190 let client_addr = server.connected_to;
2192 match server.socket.recv_from(&mut buf) {
2193 Ok((0, _)) => panic!("Received 0 bytes from socket"),
2194 Ok((read, _src)) => {
2195 let packet = iotry!(Packet::from_bytes(&buf[..read]));
2196 assert_eq!(packet.get_type(), PacketType::Data);
2197 assert_eq!(packet.seq_nr(), data_packet.seq_nr());
2198 assert!(packet.payload == data_packet.payload);
2199 let response = server.handle_packet(&packet, client_addr);
2200 assert!(response.is_ok());
2201 let response = response.unwrap();
2202 assert!(response.is_some());
2203 let response = response.unwrap();
2204 iotry!(server.socket.send_to(&response.to_bytes()[..], server.connected_to));
2205 }
2206 Err(e) => panic!("{}", e),
2207 }
2208
2209 iotry!(server.recv_from(&mut buf));
2211
2212 assert!(child.join().is_ok());
2213 }
2214
2215 #[test]
2216 fn test_socket_timeout_request() {
2217 let (server_addr, client_addr) = (next_test_ip4()
2218 .to_socket_addrs()
2219 .unwrap()
2220 .next()
2221 .unwrap(),
2222 next_test_ip4()
2223 .to_socket_addrs()
2224 .unwrap()
2225 .next()
2226 .unwrap());
2227
2228 let client = iotry!(UtpSocket::bind(client_addr));
2229 let mut server = iotry!(UtpSocket::bind(server_addr));
2230 const LEN: usize = 512;
2231 let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2232 let d = data.clone();
2233
2234 assert!(server.state == SocketState::New);
2235 assert!(client.state == SocketState::New);
2236
2237 assert_eq!(client.sender_connection_id,
2239 client.receiver_connection_id + 1);
2240
2241 let child = thread::spawn(move || {
2242 let mut client = iotry!(UtpSocket::connect(server_addr));
2243 assert!(client.state == SocketState::Connected);
2244 assert_eq!(client.connected_to, server_addr);
2245 iotry!(client.send_to(&d[..]));
2246 drop(client);
2247 });
2248
2249 let mut buf = [0u8; BUF_SIZE];
2250 server.recv(&mut buf, false).unwrap();
2251 assert_eq!(server.receiver_connection_id,
2253 server.sender_connection_id + 1);
2254
2255 assert!(server.state == SocketState::Connected);
2256
2257 iotry!(server.socket.recv_from(&mut buf));
2261
2262 server.congestion_timeout = 50;
2264
2265 loop {
2267 match server.recv_from(&mut buf) {
2268 Ok((0, _)) => continue,
2269 Ok(_) => break,
2270 Err(e) => panic!("{}", e),
2271 }
2272 }
2273
2274 drop(server);
2275
2276 assert!(child.join().is_ok());
2277 }
2278
2279 #[test]
2280 fn test_sorted_buffer_insertion() {
2281 let server_addr = next_test_ip4();
2282 let mut socket = iotry!(UtpSocket::bind(server_addr));
2283
2284 let mut packet = Packet::new();
2285 packet.set_seq_nr(1);
2286
2287 assert!(socket.incoming_buffer.is_empty());
2288
2289 socket.insert_into_buffer(packet.clone());
2290 assert_eq!(socket.incoming_buffer.len(), 1);
2291
2292 packet.set_seq_nr(2);
2293 packet.set_timestamp_microseconds(128);
2294
2295 socket.insert_into_buffer(packet.clone());
2296 assert_eq!(socket.incoming_buffer.len(), 2);
2297 assert_eq!(socket.incoming_buffer[1].seq_nr(), 2);
2298 assert_eq!(socket.incoming_buffer[1].timestamp_microseconds(), 128);
2299
2300 packet.set_seq_nr(3);
2301 packet.set_timestamp_microseconds(256);
2302
2303 socket.insert_into_buffer(packet.clone());
2304 assert_eq!(socket.incoming_buffer.len(), 3);
2305 assert_eq!(socket.incoming_buffer[2].seq_nr(), 3);
2306 assert_eq!(socket.incoming_buffer[2].timestamp_microseconds(), 256);
2307
2308 packet.set_seq_nr(2);
2310 packet.set_timestamp_microseconds(456);
2311
2312 socket.insert_into_buffer(packet.clone());
2313 assert_eq!(socket.incoming_buffer.len(), 3);
2314 assert_eq!(socket.incoming_buffer[1].seq_nr(), 2);
2315 assert_eq!(socket.incoming_buffer[1].timestamp_microseconds(), 128);
2316 }
2317
2318 #[test]
2319 fn test_duplicate_packet_handling() {
2320 let (server_addr, client_addr) = (next_test_ip4(), next_test_ip4());
2321
2322 let client = iotry!(UtpSocket::bind(client_addr));
2323 let mut server = iotry!(UtpSocket::bind(server_addr));
2324
2325 assert!(server.state == SocketState::New);
2326 assert!(client.state == SocketState::New);
2327
2328 assert_eq!(client.sender_connection_id,
2330 client.receiver_connection_id + 1);
2331
2332 let child = thread::spawn(move || {
2333 let mut client = iotry!(UtpSocket::connect(server_addr));
2334 assert!(client.state == SocketState::Connected);
2335
2336 let mut packet = Packet::new();
2337 packet.set_wnd_size(BUF_SIZE as u32);
2338 packet.set_type(PacketType::Data);
2339 packet.set_connection_id(client.sender_connection_id);
2340 packet.set_seq_nr(client.seq_nr);
2341 packet.set_ack_nr(client.ack_nr);
2342 packet.payload = vec![1, 2, 3];
2343
2344 for _ in 0u8..2 {
2346 packet.set_timestamp_microseconds(now_microseconds());
2347 iotry!(client.socket.send_to(&packet.to_bytes()[..], server_addr));
2348 }
2349 client.seq_nr += 1;
2350
2351 for _ in 0u8..1 {
2353 let mut buf = [0; BUF_SIZE];
2354 iotry!(client.socket.recv_from(&mut buf));
2355 }
2356
2357 iotry!(client.close());
2358 });
2359
2360 let mut buf = [0u8; BUF_SIZE];
2361 iotry!(server.recv(&mut buf, false));
2362 assert_eq!(server.receiver_connection_id,
2364 server.sender_connection_id + 1);
2365
2366 assert!(server.state == SocketState::Connected);
2367
2368 let expected: Vec<u8> = vec![1, 2, 3];
2369 let mut received: Vec<u8> = vec![];
2370 loop {
2371 match server.recv_from(&mut buf) {
2372 Ok((0, _src)) => break,
2373 Ok((len, _src)) => received.extend(buf[..len].to_vec()),
2374 Err(e) => panic!("{:?}", e),
2375 }
2376 }
2377 assert_eq!(received.len(), expected.len());
2378 assert_eq!(received, expected);
2379
2380 assert!(child.join().is_ok());
2381 }
2382
2383 #[test]
2441 fn test_correct_packet_loss() {
2442 let server_addr = next_test_ip4();
2443
2444 let mut server = iotry!(UtpSocket::bind(server_addr));
2445 const LEN: usize = 1024 * 10;
2446 let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2447 let to_send = data.clone();
2448
2449 let child = thread::spawn(move || {
2450 let mut client = iotry!(UtpSocket::connect(server_addr));
2451
2452 let chunks = to_send[..].chunks(BUF_SIZE);
2454 let dst = client.connected_to;
2455 for (index, chunk) in chunks.enumerate() {
2456 let mut packet = Packet::new();
2457 packet.set_seq_nr(client.seq_nr);
2458 packet.set_ack_nr(client.ack_nr);
2459 packet.set_connection_id(client.sender_connection_id);
2460 packet.set_timestamp_microseconds(now_microseconds());
2461 packet.payload = chunk.to_vec();
2462 packet.set_type(PacketType::Data);
2463
2464 if index % 2 == 0 {
2465 iotry!(client.socket.send_to(&packet.to_bytes()[..], dst));
2466 }
2467
2468 client.curr_window += packet.len() as u32;
2469 client.send_window.push(packet);
2470 client.seq_nr += 1;
2471 }
2472
2473 iotry!(client.close());
2474 });
2475
2476 let mut buf = [0; BUF_SIZE];
2477 let mut received: Vec<u8> = vec![];
2478 loop {
2479 match server.recv_from(&mut buf) {
2480 Ok((0, _src)) => break,
2481 Ok((len, _src)) => received.extend(buf[..len].to_vec()),
2482 Err(e) => panic!("{}", e),
2483 }
2484 }
2485 assert_eq!(received.len(), data.len());
2486 assert_eq!(received, data);
2487
2488 assert!(child.join().is_ok());
2489 }
2490
2491 #[test]
2492 fn test_tolerance_to_small_buffers() {
2493 let server_addr = next_test_ip4();
2494 let mut server = iotry!(UtpSocket::bind(server_addr));
2495 const LEN: usize = 1024;
2496 let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2497 let to_send = data.clone();
2498
2499 let child = thread::spawn(move || {
2500 let mut client = iotry!(UtpSocket::connect(server_addr));
2501 iotry!(client.send_to(&to_send[..]));
2502 iotry!(client.close());
2503 });
2504
2505 let mut read = Vec::new();
2506 while server.state != SocketState::Closed {
2507 let mut small_buffer = [0; 512];
2508 match server.recv_from(&mut small_buffer) {
2509 Ok((0, _src)) => break,
2510 Ok((len, _src)) => read.extend(small_buffer[..len].to_vec()),
2511 Err(e) => panic!("{}", e),
2512 }
2513 }
2514
2515 assert_eq!(read.len(), data.len());
2516 assert_eq!(read, data);
2517
2518 assert!(child.join().is_ok());
2519 }
2520
2521 #[test]
2522 fn test_sequence_number_rollover() {
2523 let (server_addr, client_addr) = (next_test_ip4(), next_test_ip4());
2524
2525 let mut server = iotry!(UtpSocket::bind(server_addr));
2526
2527 const LEN: usize = BUF_SIZE * 4;
2528 let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2529 let to_send = data.clone();
2530
2531 let child = thread::spawn(move || {
2532 let mut client = iotry!(UtpSocket::bind(client_addr));
2533
2534 client.seq_nr = ::std::u16::MAX - (to_send.len() / (BUF_SIZE * 2)) as u16;
2536
2537 let mut client = iotry!(UtpSocket::connect(server_addr));
2538 iotry!(client.send_to(&to_send[..]));
2540 assert!(client.seq_nr < 50);
2542 iotry!(client.close());
2544 });
2545
2546 let mut buf = [0; BUF_SIZE];
2547 let mut received: Vec<u8> = vec![];
2548 loop {
2549 match server.recv_from(&mut buf) {
2550 Ok((0, _src)) => break,
2551 Ok((len, _src)) => received.extend(buf[..len].to_vec()),
2552 Err(e) => panic!("{}", e),
2553 }
2554 }
2555 assert_eq!(received.len(), data.len());
2556 assert_eq!(received, data);
2557
2558 assert!(child.join().is_ok());
2559 }
2560
2561 #[test]
2562 fn test_drop_unused_socket() {
2563 let server_addr = next_test_ip4();
2564 let server = iotry!(UtpSocket::bind(server_addr));
2565
2566 drop(server);
2568 }
2569
2570 #[test]
2571 fn test_invalid_packet_on_connect() {
2572 use std::net::UdpSocket;
2573 let server_addr = next_test_ip4();
2574 let server = iotry!(UdpSocket::bind(server_addr));
2575
2576 let child = thread::spawn(move || {
2577 let mut buf = [0; BUF_SIZE];
2578 match server.recv_from(&mut buf) {
2579 Ok((_len, client_addr)) => {
2580 iotry!(server.send_to(&[], client_addr));
2581 }
2582 _ => panic!(),
2583 }
2584 });
2585
2586 match UtpSocket::connect(server_addr) {
2587 Err(ref e) if e.kind() == ErrorKind::Other => (), Err(e) => panic!("Expected ErrorKind::Other, got {:?}", e),
2589 Ok(_) => panic!("Expected Err, got Ok"),
2590 }
2591
2592 assert!(child.join().is_ok());
2593 }
2594
2595 #[test]
2596 fn test_receive_unexpected_reply_type_on_connect() {
2597 use std::net::UdpSocket;
2598 let server_addr = next_test_ip4();
2599 let server = iotry!(UdpSocket::bind(server_addr));
2600
2601 let child = thread::spawn(move || {
2602 let mut buf = [0; BUF_SIZE];
2603 let mut packet = Packet::new();
2604 packet.set_type(PacketType::Data);
2605
2606 match server.recv_from(&mut buf) {
2607 Ok((_len, client_addr)) => {
2608 iotry!(server.send_to(&packet.to_bytes()[..], client_addr));
2609 }
2610 _ => panic!(),
2611 }
2612 });
2613
2614 match UtpSocket::connect(server_addr) {
2615 Err(ref e) if e.kind() == ErrorKind::TimedOut => (), Err(e) => panic!("Expected ErrorKind::TimedOut, got {:?}", e),
2617 Ok(_) => panic!("Expected Err, got Ok"),
2618 }
2619
2620 assert!(child.join().is_ok());
2621 }
2622
2623 #[test]
2624 fn test_receiving_syn_on_established_connection() {
2625 let server_addr = next_test_ip4();
2627 let mut server = iotry!(UtpSocket::bind(server_addr));
2628
2629 let child = thread::spawn(move || {
2630 let mut buf = [0; BUF_SIZE];
2631 loop {
2632 match server.recv_from(&mut buf) {
2633 Ok((0, _src)) => break,
2634 Ok(_) => (),
2635 Err(e) => panic!("{:?}", e),
2636 }
2637 }
2638 });
2639
2640 let mut client = iotry!(UtpSocket::connect(server_addr));
2641 let mut packet = Packet::new();
2642 packet.set_wnd_size(BUF_SIZE as u32);
2643 packet.set_type(PacketType::Syn);
2644 packet.set_connection_id(client.sender_connection_id);
2645 packet.set_seq_nr(client.seq_nr);
2646 packet.set_ack_nr(client.ack_nr);
2647
2648 let other_socket = iotry!(::std::net::UdpSocket::bind("0.0.0.0:0"));
2649
2650 iotry!(other_socket.send_to(&packet.to_bytes()[..], server_addr));
2651
2652 let mut buf = [0; BUF_SIZE];
2653 match other_socket.recv_from(&mut buf) {
2654 Ok((len, _src)) => {
2655 let reply = Packet::from_bytes(&buf[..len]).ok().unwrap();
2656 assert_eq!(reply.get_type(), PacketType::Reset);
2657 }
2658 Err(e) => panic!("{:?}", e),
2659 }
2660 iotry!(client.close());
2661
2662 assert!(child.join().is_ok());
2663 }
2664
2665 #[test]
2666 fn test_receiving_reset_on_established_connection() {
2667 let server_addr = next_test_ip4();
2669 let mut server = iotry!(UtpSocket::bind(server_addr));
2670
2671 let child = thread::spawn(move || {
2672 let client = iotry!(UtpSocket::connect(server_addr));
2673 let mut packet = Packet::new();
2674 packet.set_wnd_size(BUF_SIZE as u32);
2675 packet.set_type(PacketType::Reset);
2676 packet.set_connection_id(client.sender_connection_id);
2677 packet.set_seq_nr(client.seq_nr);
2678 packet.set_ack_nr(client.ack_nr);
2679 iotry!(client.socket.send_to(&packet.to_bytes()[..], server_addr));
2680 let mut buf = [0; BUF_SIZE];
2681 match client.socket.recv_from(&mut buf) {
2682 Ok((_len, _src)) => (),
2683 Err(e) => panic!("{:?}", e),
2684 }
2685 });
2686
2687 let mut buf = [0; BUF_SIZE];
2688 loop {
2689 match server.recv_from(&mut buf) {
2690 Ok((0, _src)) => break,
2691 Ok(_) => (),
2692 Err(ref e) if e.kind() == ErrorKind::ConnectionReset => return,
2693 Err(e) => panic!("{:?}", e),
2694 }
2695 }
2696 assert!(child.join().is_ok());
2697 panic!("Should have received Reset");
2698 }
2699
2700 #[test]
2701 fn test_premature_fin() {
2702 let (server_addr, client_addr) = (next_test_ip4(), next_test_ip4());
2703 let mut server = iotry!(UtpSocket::bind(server_addr));
2704
2705 const LEN: usize = BUF_SIZE * 4;
2706 let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2707 let to_send = data.clone();
2708
2709 let child = thread::spawn(move || {
2710 let mut client = iotry!(UtpSocket::connect(server_addr));
2711 iotry!(client.send_to(&to_send[..]));
2712 iotry!(client.close());
2713 });
2714
2715 let mut buf = [0; BUF_SIZE];
2716
2717 iotry!(server.recv(&mut buf, false));
2719
2720 let mut packet = Packet::new();
2722 packet.set_connection_id(server.sender_connection_id);
2723 packet.set_seq_nr(server.seq_nr);
2724 packet.set_ack_nr(server.ack_nr);
2725 packet.set_timestamp_microseconds(now_microseconds());
2726 packet.set_type(PacketType::Fin);
2727 iotry!(server.socket.send_to(&packet.to_bytes()[..], client_addr));
2728
2729 let mut received: Vec<u8> = vec![];
2731 loop {
2732 match server.recv_from(&mut buf) {
2733 Ok((0, _src)) => break,
2734 Ok((len, _src)) => received.extend(buf[..len].to_vec()),
2735 Err(e) => panic!("{}", e),
2736 }
2737 }
2738 assert_eq!(received.len(), data.len());
2739 assert_eq!(received, data);
2740
2741 assert!(child.join().is_ok());
2742 }
2743
2744 #[test]
2745 fn test_base_delay_calculation() {
2746 let minute_in_microseconds = 60 * 10i64.pow(6);
2747 let samples = vec![(0, 10),
2748 (1, 8),
2749 (2, 12),
2750 (3, 7),
2751 (minute_in_microseconds + 1, 11),
2752 (minute_in_microseconds + 2, 19),
2753 (minute_in_microseconds + 3, 9)];
2754 let addr = next_test_ip4();
2755 let mut socket = UtpSocket::bind(addr).unwrap();
2756
2757 for (timestamp, delay) in samples {
2758 socket.update_base_delay(delay, timestamp + delay);
2759 }
2760
2761 let expected = vec![7, 9];
2762 let actual = socket.base_delays.iter().map(|&x| x).collect::<Vec<_>>();
2763 assert_eq!(expected, actual);
2764 assert_eq!(socket.min_base_delay(), 7);
2765 }
2766
2767 #[test]
2768 fn test_local_addr() {
2769 let addr = next_test_ip4();
2770 let addr = addr.to_socket_addrs().unwrap().next().unwrap();
2771 let socket = UtpSocket::bind(addr).unwrap();
2772
2773 assert!(socket.local_addr().is_ok());
2774 assert_eq!(socket.local_addr().unwrap(), addr);
2775 }
2776
2777 #[test]
2778 fn test_listener_local_addr() {
2779 let addr = next_test_ip4();
2780 let addr = addr.to_socket_addrs().unwrap().next().unwrap();
2781 let listener = UtpListener::bind(addr).unwrap();
2782
2783 assert!(listener.local_addr().is_ok());
2784 assert_eq!(listener.local_addr().unwrap(), addr);
2785 }
2786
2787 #[test]
2788 fn test_peer_addr() {
2789 use std::sync::mpsc::channel;
2790 let addr = next_test_ip4();
2791 let server_addr = addr.to_socket_addrs().unwrap().next().unwrap();
2792 let mut server = UtpSocket::bind(server_addr).unwrap();
2793 let (tx, rx) = channel();
2794
2795 assert!(server.peer_addr().is_err());
2797
2798 let child = thread::spawn(move || {
2799 let mut client = iotry!(UtpSocket::connect(server_addr));
2800 let mut buf = [0; 1024];
2801 iotry!(tx.send(client.local_addr()));
2802 iotry!(client.recv_from(&mut buf));
2803 });
2804
2805 let mut buf = [0; 1024];
2807 iotry!(server.recv(&mut buf, false));
2808
2809 assert!(server.peer_addr().is_ok());
2811 let client_addr = rx.recv().unwrap().unwrap();
2814 assert_eq!(server.peer_addr().unwrap().port(), client_addr.port());
2815
2816 iotry!(server.close());
2818
2819 assert!(server.peer_addr().is_err());
2821
2822 assert!(child.join().is_ok());
2823 }
2824
2825 #[test]
2826 fn test_take_address() {
2827 assert!(take_address(("0.0.0.0:0")).is_ok());
2829 assert!(take_address((":::0")).is_ok());
2830 assert!(take_address(("0.0.0.0", 0)).is_ok());
2831 assert!(take_address(("::", 0)).is_ok());
2832 assert!(take_address(("1.2.3.4", 5)).is_ok());
2833
2834 assert!(take_address("999.0.0.0:0").is_err());
2836 assert!(take_address(("1.2.3.4:70000")).is_err());
2837 assert!(take_address("").is_err());
2838 assert!(take_address("this is not an address").is_err());
2839 assert!(take_address("no.dns.resolution.com").is_err());
2840 }
2841
2842 #[test]
2844 fn test_connection_loss_data() {
2845 let server_addr = next_test_ip4();
2846 let mut server = iotry!(UtpSocket::bind(server_addr));
2847 server.congestion_timeout = 1;
2849 let attempts = server.max_retransmission_retries;
2850
2851 let child = thread::spawn(move || {
2852 let mut client = iotry!(UtpSocket::connect(server_addr));
2853 iotry!(client.send_to(&[0]));
2854 client.state = SocketState::Closed;
2856 let socket = client.socket.try_clone().unwrap();
2857 let mut buf = [0; BUF_SIZE];
2858 iotry!(socket.recv_from(&mut buf));
2859 for _ in 0..attempts {
2860 match socket.recv_from(&mut buf) {
2861 Ok((len, _src)) => {
2862 assert_eq!(Packet::from_bytes(&buf[..len]).unwrap().get_type(),
2863 PacketType::Data)
2864 }
2865 Err(e) => panic!("{}", e),
2866 }
2867 }
2868 });
2869
2870 let mut buf = [0; BUF_SIZE];
2872 iotry!(server.recv_from(&mut buf));
2873
2874 iotry!(server.send_to(&[0]));
2875
2876 let mut buf = [0; BUF_SIZE];
2878 match server.recv(&mut buf, false) {
2879 Err(ref e) if e.kind() == ErrorKind::TimedOut => (),
2880 x => panic!("Expected Err(TimedOut), got {:?}", x),
2881 }
2882
2883 assert!(child.join().is_ok());
2884 }
2885
2886 #[test]
2888 fn test_connection_loss_fin() {
2889 let server_addr = next_test_ip4();
2890 let mut server = iotry!(UtpSocket::bind(server_addr));
2891 server.congestion_timeout = 1;
2893 let attempts = server.max_retransmission_retries;
2894
2895 let child = thread::spawn(move || {
2896 let mut client = iotry!(UtpSocket::connect(server_addr));
2897 iotry!(client.send_to(&[0]));
2898 client.state = SocketState::Closed;
2900 let socket = client.socket.try_clone().unwrap();
2901 let mut buf = [0; BUF_SIZE];
2902 iotry!(socket.recv_from(&mut buf));
2903 for _ in 0..attempts {
2904 match socket.recv_from(&mut buf) {
2905 Ok((len, _src)) => {
2906 assert_eq!(Packet::from_bytes(&buf[..len]).unwrap().get_type(),
2907 PacketType::Fin)
2908 }
2909 Err(e) => panic!("{}", e),
2910 }
2911 }
2912 });
2913
2914 let mut buf = [0; BUF_SIZE];
2916 iotry!(server.recv_from(&mut buf));
2917
2918 match server.close() {
2920 Err(ref e) if e.kind() == ErrorKind::TimedOut => (),
2921 x => panic!("Expected Err(TimedOut), got {:?}", x),
2922 }
2923 assert!(child.join().is_ok());
2924 }
2925
2926 #[test]
2928 fn test_connection_loss_waiting() {
2929 let server_addr = next_test_ip4();
2930 let mut server = iotry!(UtpSocket::bind(server_addr));
2931 server.congestion_timeout = 1;
2933 let attempts = server.max_retransmission_retries;
2934
2935 let child = thread::spawn(move || {
2936 let mut client = iotry!(UtpSocket::connect(server_addr));
2937 iotry!(client.send_to(&[0]));
2938 client.state = SocketState::Closed;
2940 let socket = client.socket.try_clone().unwrap();
2941 let seq_nr = client.seq_nr;
2942 let mut buf = [0; BUF_SIZE];
2943 for _ in 0..(3 * attempts) {
2944 match socket.recv_from(&mut buf) {
2945 Ok((len, _src)) => {
2946 let packet = iotry!(Packet::from_bytes(&buf[..len]));
2947 assert_eq!(packet.get_type(), PacketType::State);
2948 assert_eq!(packet.ack_nr(), seq_nr - 1);
2949 }
2950 Err(e) => panic!("{}", e),
2951 }
2952 }
2953 });
2954
2955 let mut buf = [0; BUF_SIZE];
2957 iotry!(server.recv_from(&mut buf));
2958
2959 let mut buf = [0; BUF_SIZE];
2961 match server.recv_from(&mut buf) {
2962 Err(ref e) if e.kind() == ErrorKind::TimedOut => (),
2963 x => panic!("Expected Err(TimedOut), got {:?}", x),
2964 }
2965 assert!(child.join().is_ok());
2966 }
2967
2968 const NETWORK_NODE_COUNT: usize = 20;
2969 const NETWORK_MSG_COUNT: usize = 5;
2970
2971 fn test_network(exchange: fn(&mut UtpSocket) -> ()) {
2972 use std::net::SocketAddr;
2973 use std::thread::{JoinHandle, spawn};
2974
2975 const NODE_COUNT: usize = NETWORK_NODE_COUNT;
2976
2977 struct Node {
2978 listener: UtpListener,
2979 }
2980
2981 impl Node {
2982 fn new() -> Node {
2983 Node { listener: iotry!(UtpListener::bind("127.0.0.1:0")) }
2984 }
2985
2986 fn run(&mut self, exchange: fn(&mut UtpSocket) -> (), peer_addrs: Vec<SocketAddr>) {
2987 let connect_cnt = peer_addrs.len();
2988
2989 let connect_join_handle = spawn(move || {
2990 let mut send_jhs = Vec::<JoinHandle<()>>::new();
2991
2992 for peer_addr in peer_addrs {
2993 send_jhs.push(spawn(move || {
2994 let mut socket = iotry!(UtpSocket::connect(peer_addr));
2995 exchange(&mut socket);
2996 }));
2997 }
2998
2999 for jh in send_jhs {
3000 iotry!(jh.join());
3001 }
3002 });
3003
3004 let mut recv_jhs = Vec::<JoinHandle<()>>::new();
3005
3006 for _ in 0..NODE_COUNT-1-connect_cnt {
3007 let mut socket = iotry!(self.listener.accept()).0;
3008 recv_jhs.push(spawn(move || {
3009 exchange(&mut socket);
3010 }));
3011 }
3012
3013 for jh in recv_jhs {
3014 iotry!(jh.join());
3015 }
3016
3017 iotry!(connect_join_handle.join());
3018 }
3019 }
3020
3021 let mut nodes = Vec::<Node>::new();
3022
3023 for _ in 0..NODE_COUNT {
3024 nodes.push(Node::new());
3025 }
3026
3027 let listening_addrs = nodes.iter()
3028 .map(|n| iotry!(n.listener.local_addr()))
3029 .collect::<Vec<_>>();
3030
3031 let mut join_handles = Vec::<JoinHandle<()>>::new();
3032
3033 let mut ni: usize = 0;
3034 for mut node in nodes {
3035 let mut addrs = Vec::<SocketAddr>::new();
3036
3037 for ai in 0..listening_addrs.len() {
3038 if ai <= ni { continue }
3039 addrs.push(listening_addrs[ai].clone());
3040 }
3041
3042 join_handles.push(spawn(move || {
3043 node.run(exchange, addrs);
3044 }));
3045
3046 ni += 1;
3047 }
3048
3049 for handle in join_handles {
3050 iotry!(handle.join());
3051 }
3052 }
3053
3054 #[test]
3055 fn test_network_no_timeout() {
3056 static MSG_COUNT: usize = NETWORK_MSG_COUNT;
3057
3058 fn make_buf(i: usize) -> [u8; 10] {
3059 let mut buf = [0; 10];
3060 for j in 0..10 {
3061 buf[j] = (i + j) as u8;
3062 }
3063 buf
3064 }
3065
3066 fn sequential_exchange(socket: &mut UtpSocket) {
3067 let mut i = 0;
3068 let from = socket.socket.local_addr().map(|addr| addr.port()).unwrap_or(0);
3069 let to = socket.connected_to.port();
3070
3071 while i < MSG_COUNT {
3072 let tx_buf = make_buf(i);
3073 assert_eq!(iotry!(socket.send_to(&tx_buf)), tx_buf.len());
3074 let mut buf = [0; 10];
3075
3076 match socket.recv_from(&mut buf) {
3077 Ok((cnt, _)) => {
3078 if cnt == 0 {
3079 if socket.state != SocketState::Connected {
3080 panic!("socket is in an invalid state \"{:?}\" from {:?} to {:?}",
3081 socket.state, from, to);
3082 }
3083 }
3084 assert_eq!(cnt, 10);
3085 if buf != make_buf(i) {
3086 panic!("expected {:?} but received {:?} in recv step {}",
3087 make_buf(i),
3088 buf,
3089 i);
3090 }
3091 },
3092 Err(err) => {
3093 panic!("Recv error {:?}; from {:?} to {:?}", err, from, to);
3094 }
3095 }
3096 i += 1;
3097 }
3098 }
3099
3100 for i in 0..100 {
3101 println!("------ Testing Network iteration {}", i);
3102 test_network(sequential_exchange);
3103 }
3104 }
3105
3106 #[test]
3107 fn test_network_with_timeout() {
3108 static MSG_COUNT: usize = NETWORK_MSG_COUNT;
3109
3110 fn make_buf(i: usize) -> [u8; 10] {
3111 let mut buf = [0; 10];
3112 for j in 0..10 {
3113 buf[j] = (i + j) as u8;
3114 }
3115 buf
3116 }
3117
3118 fn timeout_exchange(socket: &mut UtpSocket) {
3119 socket.set_read_timeout(Some(50));
3120 let mut recv_cnt = 0;
3121 let mut send_cnt = 0;
3122
3123 let from = socket.socket.local_addr().map(|addr| addr.port()).unwrap_or(0);
3124 let to = socket.connected_to.port();
3125
3126 loop {
3127 if send_cnt < MSG_COUNT {
3128 let tx_buf = make_buf(send_cnt);
3129
3130 match socket.send_to(&tx_buf) {
3131 Ok(cnt) => {
3132 assert_eq!(cnt, tx_buf.len());
3133 send_cnt += 1;
3134 }
3135 Err(ref e) if e.kind() == ErrorKind::TimedOut => {}
3136 Err(e) => {
3137 panic!("{:?}", e);
3138 }
3139 }
3140 }
3141 if recv_cnt < MSG_COUNT {
3142 let exp_buf = make_buf(recv_cnt);
3143
3144 let mut buf = [0; 10];
3145 match socket.recv_from(&mut buf) {
3146 Ok((cnt, _)) => {
3147 if cnt == 0 {
3148 if socket.state != SocketState::Connected {
3149 panic!("socket is in an invalid state \"{:?}\" \
3150 from {:?} to {:?} in receive #{}",
3151 socket.state, from, to, recv_cnt);
3152 }
3153 } else {
3154 assert_eq!(cnt, exp_buf.len());
3155 assert_eq!(buf, exp_buf);
3156 recv_cnt += 1;
3157 }
3158 },
3159 Err(ref e) if e.kind() == ErrorKind::TimedOut => {
3160 },
3161 Err(e) => {
3162 panic!("{:?} recv_cnt={} send_cnt={}", e, recv_cnt, send_cnt);
3163 }
3164 }
3165 }
3166
3167 if send_cnt == MSG_COUNT && recv_cnt == MSG_COUNT {
3168 break;
3169 }
3170 }
3171 }
3172
3173
3174 for i in 0..100 {
3175 println!("------ Testing Network iteration {}", i);
3176 test_network(timeout_exchange);
3177 }
3178 }
3179
3180 #[test]
3181 fn test_send_client_to_server() {
3182 let listener = iotry!(UtpListener::bind("127.0.0.1:0"));
3183 let server_addr = iotry!(listener.local_addr());
3184
3185 static TX_BUF: [u8; 10] = [0,1,2,3,4,5,6,7,8,9];
3186
3187 let client_t = thread::spawn(move || {
3188 let mut client = iotry!(UtpSocket::connect(server_addr));
3189 assert_eq!(iotry!(client.send_to(&TX_BUF)), TX_BUF.len());
3190 });
3191
3192 let mut server = iotry!(listener.accept()).0;
3193
3194 let mut buf = [0; 10];
3195 iotry!(server.recv_from(&mut buf));
3196 assert_eq!(buf, TX_BUF);
3197
3198 assert!(client_t.join().is_ok());
3199 }
3200
3201 #[test]
3203 fn test_send_server_to_client() {
3204 let listener = iotry!(UtpListener::bind("127.0.0.1:0"));
3205 let server_addr = iotry!(listener.local_addr());
3206
3207 static TX_BUF: [u8; 10] = [0,1,2,3,4,5,6,7,8,9];
3208
3209 let client_t = thread::spawn(move || {
3210 let mut client = iotry!(UtpSocket::connect(server_addr));
3211 let mut buf = [0; 10];
3212 iotry!(client.recv_from(&mut buf));
3213 assert_eq!(buf, TX_BUF);
3214 });
3215
3216 let mut server = iotry!(listener.accept()).0;
3217
3218 assert_eq!(iotry!(server.send_to(&TX_BUF)), TX_BUF.len());
3219 let fr = server.flush();
3220 assert!(fr.is_ok());
3221
3222 assert!(client_t.join().is_ok());
3223 }
3224
3225 #[test]
3227 fn test_data_exchange_utp() {
3228 let listener = iotry!(UtpListener::bind("127.0.0.1:0"));
3229 let server_addr = iotry!(listener.local_addr());
3230
3231 static TX_BUF: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
3232
3233 let client_t = thread::spawn(move || {
3234 let mut client = iotry!(UtpSocket::connect(server_addr));
3235 assert_eq!(iotry!(client.send_to(&TX_BUF)), TX_BUF.len());
3236 let mut buf = [0; 10];
3237 iotry!(client.recv_from(&mut buf));
3238 assert_eq!(buf, TX_BUF);
3239 });
3240
3241 let mut server = iotry!(listener.accept()).0;
3242
3243 assert_eq!(iotry!(server.send_to(&TX_BUF)), TX_BUF.len());
3244 let mut buf = [0; 10];
3245 iotry!(server.recv_from(&mut buf));
3246 assert_eq!(buf, TX_BUF);
3247 let _ = server.flush();
3248
3249 assert!(client_t.join().is_ok());
3250 }
3251
3252 #[test]
3254 fn test_data_exchange_tcp() {
3255 use std::net::{TcpListener, TcpStream};
3256 use std::io::{Read, Write};
3257
3258 static TX_BUF: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
3259
3260 let listener = iotry!(TcpListener::bind("127.0.0.1:0"));
3261 let server_addr = iotry!(listener.local_addr());
3262
3263 let client_t = thread::spawn(move || {
3264 let mut client = iotry!(TcpStream::connect(server_addr));
3265 assert_eq!(iotry!(client.write(&TX_BUF)), TX_BUF.len());
3266 let mut buf = [0; 10];
3267 iotry!(client.read(&mut buf));
3268 assert_eq!(buf, TX_BUF);
3269 });
3270
3271 let mut server = iotry!(listener.accept()).0;
3272
3273 assert_eq!(iotry!(server.write(&TX_BUF)), TX_BUF.len());
3274 let mut buf = [0; 10];
3275 iotry!(server.read(&mut buf));
3276 assert_eq!(buf, TX_BUF);
3277
3278 assert!(client_t.join().is_ok());
3279 }
3280
3281}