Skip to main content

utp/
socket.rs

1use std::cmp::{min, max};
2use std::collections::VecDeque;
3use std::net::{ToSocketAddrs, SocketAddr, UdpSocket};
4use std::io::{Result, Error, ErrorKind};
5use util::{now_microseconds, ewma, Sequence};
6use packet::{Packet, PacketType, Encodable, Decodable, ExtensionType, HEADER_SIZE};
7use rand::{self, Rng};
8use time::SteadyTime;
9use time;
10use std::time::Duration;
11
12// For simplicity's sake, let us assume no packet will ever exceed the
13// Ethernet maximum transfer unit of 1500 bytes.
14const BUF_SIZE: usize = 1500;
15const GAIN: f64 = 1.0;
16const ALLOWED_INCREASE: u32 = 1;
17const TARGET: i64 = 100_000; // 100 milliseconds
18const MSS: u32 = 1400;
19const MIN_CWND: u32 = 2;
20const INIT_CWND: u32 = 2;
21const INITIAL_CONGESTION_TIMEOUT: u64 = 1000; // one second
22const MIN_CONGESTION_TIMEOUT: u64 = 500; // 500 ms
23const MAX_CONGESTION_TIMEOUT: u64 = 60_000; // one minute
24const BASE_HISTORY: usize = 10; // base delays history size
25const MAX_SYN_RETRIES: u32 = 5; // maximum connection retries
26const MAX_RETRANSMISSION_RETRIES: u32 = 5; // maximum retransmission retries
27
28// Maximum time (in microseconds) to wait for incoming packets when the send window is full
29const PRE_SEND_TIMEOUT: u32 = 500_000;
30
31// Maximum age of base delay sample (60 seconds)
32const MAX_BASE_DELAY_AGE: i64 = 60_000_000;
33
34#[derive(Debug)]
35pub enum SocketError {
36    ConnectionClosed,
37    ConnectionReset,
38    ConnectionTimedOut,
39    UserTimedOut,
40    InvalidAddress,
41    InvalidPacket,
42    InvalidReply,
43    NotConnected,
44}
45
46impl From<SocketError> for Error {
47    fn from(error: SocketError) -> Error {
48        use self::SocketError::*;
49        let (kind, message) = match error {
50            ConnectionClosed => (ErrorKind::NotConnected, "The socket is closed"),
51            ConnectionReset => {
52                (ErrorKind::ConnectionReset,
53                 "Connection reset by remote peer")
54            }
55            ConnectionTimedOut | UserTimedOut => (ErrorKind::TimedOut, "Connection timed out"),
56            InvalidAddress => (ErrorKind::InvalidInput, "Invalid address"),
57            InvalidPacket => (ErrorKind::Other, "Error parsing packet"),
58            InvalidReply => {
59                (ErrorKind::ConnectionRefused,
60                 "The remote peer sent an invalid reply")
61            }
62            NotConnected => (ErrorKind::NotConnected, "The socket is not connected"),
63        };
64        Error::new(kind, message)
65    }
66}
67
68#[derive(PartialEq, Eq, Debug, Copy, Clone)]
69enum SocketState {
70    New,
71    Connected,
72    SynSent,
73    FinSent,
74    ResetReceived,
75    Closed,
76}
77
78struct DelayDifferenceSample {
79    received_at: i64,
80    difference: i64,
81}
82
83/// Returns the first valid address in a `ToSocketAddrs` iterator.
84fn take_address<A: ToSocketAddrs>(addr: A) -> Result<SocketAddr> {
85    addr.to_socket_addrs()
86        .and_then(|mut it| it.next().ok_or(From::from(SocketError::InvalidAddress)))
87}
88
89fn unsafe_copy(src: &[u8], dst: &mut [u8]) -> usize {
90    let max_len = min(src.len(), dst.len());
91    unsafe {
92        use std::ptr::copy;
93        copy(src.as_ptr(), dst.as_mut_ptr(), max_len);
94    }
95    max_len
96}
97
98/// A structure that represents a uTP (Micro Transport Protocol) connection between a local socket
99/// and a remote socket.
100///
101/// The socket will be closed when the value is dropped (either explicitly or when it goes out of
102/// scope).
103///
104/// The default maximum retransmission retries is 5, which translates to about 16 seconds. It can be
105/// changed by assigning the desired maximum retransmission retries to a socket's
106/// `max_retransmission_retries` field. Notice that the initial congestion timeout is 500 ms and
107/// doubles with each timeout.
108///
109/// # Examples
110///
111/// ```no_run
112/// use utp::UtpSocket;
113///
114/// let mut socket = UtpSocket::bind("127.0.0.1:1234").expect("Error binding socket");
115///
116/// let mut buf = [0; 1000];
117/// let (amt, _src) = socket.recv_from(&mut buf).expect("Error receiving");
118///
119/// let mut buf = &mut buf[..amt];
120/// buf.reverse();
121/// let _ = socket.send_to(buf).expect("Error sending");
122///
123/// // Close the socket. You can either call `close` on the socket,
124/// // explicitly drop it or just let it go out of scope.
125/// socket.close();
126/// ```
127pub struct UtpSocket {
128    /// The wrapped UDP socket
129    socket: UdpSocket,
130
131    /// Remote peer
132    connected_to: SocketAddr,
133
134    /// Sender connection identifier
135    sender_connection_id: u16,
136
137    /// Receiver connection identifier
138    receiver_connection_id: u16,
139
140    /// Sequence number for the next packet
141    seq_nr: u16,
142
143    /// Sequence number of the latest acknowledged packet sent by the remote peer
144    ack_nr: u16,
145
146    /// Socket state
147    state: SocketState,
148
149    /// Received but not acknowledged packets
150    incoming_buffer: Vec<Packet>,
151
152    /// Sent but not yet acknowledged packets
153    send_window: Vec<Packet>,
154
155    /// Packets not yet sent
156    unsent_queue: VecDeque<Packet>,
157
158    /// How many ACKs did the socket receive for packet with sequence number equal to `ack_nr`
159    duplicate_ack_count: u32,
160
161    /// Sequence number of the latest packet the remote peer acknowledged
162    last_acked: u16,
163
164    /// Timestamp of the latest packet the remote peer acknowledged
165    last_acked_timestamp: u32,
166
167    /// Sequence number of the last packet removed from the incoming buffer
168    last_dropped: u16,
169
170    /// Round-trip time to remote peer
171    rtt: i32,
172
173    /// Variance of the round-trip time to the remote peer
174    rtt_variance: i32,
175
176    /// Data from the latest packet not yet returned in `recv_from`
177    pending_data: VecDeque<u8>,
178
179    /// Another buffer of data to be returned in recv_from
180    /// this comes before pending_data
181    read_ready_data: VecDeque<u8>,
182
183    /// Bytes in flight
184    curr_window: u32,
185
186    /// Window size of the remote peer
187    remote_wnd_size: u32,
188
189    /// Rolling window of packet delay to remote peer
190    base_delays: VecDeque<i64>,
191
192    /// Rolling window of the difference between sending a packet and receiving its acknowledgement
193    current_delays: Vec<DelayDifferenceSample>,
194
195    /// Difference between timestamp of the latest packet received and time of reception
196    their_delay: u32,
197
198    /// Start of the current minute for sampling purposes
199    last_rollover: i64,
200
201    /// Current congestion timeout in milliseconds
202    congestion_timeout: u64,
203
204    /// Congestion window in bytes
205    cwnd: u32,
206
207    /// Maximum retransmission retries
208    pub max_retransmission_retries: u32,
209
210    /// Used by `set_read_timeout`.
211    user_read_timeout: u64,
212
213    /// The last time congestion algorithm was updated/handled-a-timeout
214    last_congestion_update: SteadyTime,
215
216    retries: u32,
217
218    /// The first 'State' packet we sent if we are a server (it may
219    /// need to be resent if the network dropped it).
220    state_packet: Option<Packet>,
221
222    /// The last time we've sent something over the wire
223    last_msg_sent_timestamp: SteadyTime,
224}
225
226impl UtpSocket {
227    /// Creates a new UTP socket from the given UDP socket and the remote peer's address.
228    ///
229    /// The connection identifier of the resulting socket is randomly generated.
230    fn from_raw_parts(s: UdpSocket, src: SocketAddr) -> UtpSocket {
231        // Safely generate the two sequential connection identifiers.
232        // This avoids an overflow when the generated receiver identifier is the largest
233        // representable value in u16 and it is incremented to yield the corresponding sender
234        // identifier.
235        let (receiver_id, sender_id) =
236            || -> (u16, u16) {
237                let mut rng = rand::thread_rng();
238                loop {
239                    let id = rng.gen::<u16>();
240                    if id.checked_add(1).is_some() {
241                        return (id, id + 1);
242                    }
243                }
244            }();
245
246        UtpSocket {
247            socket: s,
248            connected_to: src,
249            receiver_connection_id: receiver_id,
250            sender_connection_id: sender_id,
251            seq_nr: 1,
252            ack_nr: 0,
253            state: SocketState::New,
254            incoming_buffer: Vec::new(),
255            send_window: Vec::new(),
256            unsent_queue: VecDeque::new(),
257            duplicate_ack_count: 0,
258            last_acked: 0,
259            last_acked_timestamp: 0,
260            last_dropped: 0,
261            rtt: 0,
262            rtt_variance: 0,
263            read_ready_data: VecDeque::new(),
264            pending_data: VecDeque::new(),
265            curr_window: 0,
266            remote_wnd_size: 0,
267            current_delays: Vec::new(),
268            base_delays: VecDeque::with_capacity(BASE_HISTORY),
269            their_delay: 0,
270            last_rollover: 0,
271            congestion_timeout: INITIAL_CONGESTION_TIMEOUT,
272            cwnd: INIT_CWND * MSS,
273            max_retransmission_retries: MAX_RETRANSMISSION_RETRIES,
274            user_read_timeout: 0,
275            last_congestion_update: SteadyTime::now(),
276            retries: 0,
277            state_packet: None,
278            last_msg_sent_timestamp: SteadyTime::now(),
279        }
280    }
281
282    /// Creates a new UTP socket from the given UDP socket.
283    pub fn bind_with_udp_socket(socket: UdpSocket) -> Result<UtpSocket> {
284        socket.local_addr().map(|a| UtpSocket::from_raw_parts(socket, a))
285    }
286
287    /// Creates a new UTP socket from the given address.
288    ///
289    /// The address type can be any implementer of the `ToSocketAddr` trait. See its documentation
290    /// for concrete examples.
291    ///
292    /// If more than one valid address is specified, only the first will be used.
293    pub fn bind<A: ToSocketAddrs>(addr: A) -> Result<UtpSocket> {
294        take_address(addr).and_then(|a| UdpSocket::bind(a).map(|s| UtpSocket::from_raw_parts(s, a)))
295    }
296
297    /// Returns the socket address that this socket was created from.
298    pub fn local_addr(&self) -> Result<SocketAddr> {
299        self.socket.local_addr()
300    }
301
302    /// Returns the socket address of the remote peer of this UTP connection.
303    pub fn peer_addr(&self) -> Result<SocketAddr> {
304        if self.state == SocketState::Connected || self.state == SocketState::FinSent {
305            Ok(self.connected_to)
306        } else {
307            Err(Error::from(SocketError::NotConnected))
308        }
309    }
310
311    /// Opens a connection to a remote host by hostname or IP address.
312    ///
313    /// The address type can be any implementer of the `ToSocketAddr` trait. See its documentation
314    /// for concrete examples.
315    ///
316    /// If more than one valid address is specified, only the first will be used.
317    pub fn connect<A: ToSocketAddrs>(other: A) -> Result<UtpSocket> {
318        let addr = try!(take_address(other));
319        let my_addr = match addr {
320            SocketAddr::V4(_) => "0.0.0.0:0",
321            SocketAddr::V6(_) => ":::0",
322        };
323        let mut socket = try!(UtpSocket::bind(my_addr));
324        socket.connected_to = addr;
325
326        let mut packet = Packet::new();
327        packet.set_type(PacketType::Syn);
328        packet.set_connection_id(socket.receiver_connection_id);
329        packet.set_seq_nr(socket.seq_nr);
330
331        let mut buf = [0; BUF_SIZE];
332        let mut syn_timeout = socket.congestion_timeout;
333        let mut syn_retries = 0;
334
335        while syn_retries < MAX_SYN_RETRIES {
336            packet.set_timestamp_microseconds(now_microseconds());
337
338            // Send packet
339            debug!("Connecting to {}", socket.connected_to);
340            try!(socket.socket.send_to(&packet.to_bytes()[..], socket.connected_to));
341            socket.state = SocketState::SynSent;
342            debug!("sent {:?}", packet);
343
344            // Validate response
345            socket.socket
346                  .set_read_timeout(Some(Duration::from_millis(syn_timeout)))
347                  .expect("Error setting read timeout");
348            match socket.socket.recv_from(&mut buf) {
349                Ok((read, addr)) => {
350                    let packet = try!(Packet::from_bytes(&buf[..read]).or(Err(SocketError::InvalidPacket)));
351
352                    socket.connected_to = addr;
353
354                    if packet.get_type() != PacketType::State {
355                        // The network might have dropped the `State` packet
356                        // from the peer, so we need to ask for it again.
357                        syn_retries += 1;
358                        continue;
359                    }
360
361                    try!(socket.handle_packet(&packet, addr));
362
363                    return Ok(socket);
364                },
365                Err(ref e) if (e.kind() == ErrorKind::WouldBlock ||
366                               e.kind() == ErrorKind::TimedOut) => {
367                    debug!("Timed out, retrying");
368                    syn_timeout *= 2;
369                    syn_retries += 1;
370                    continue;
371                }
372                Err(e) => return Err(e),
373            };
374        }
375
376        Err(Error::from(SocketError::ConnectionTimedOut))
377    }
378
379    /// If you have already prepared UDP sockets at each end (e.g. you're doing
380    /// hole punching), then the rendezvous connection setup is your choice.
381    ///
382    /// Rendezvous connection will only use the specified socket and addresses,
383    /// but each end must call `rendezvous_connect` itself.
384    ///
385    /// This is an unofficial extension to the uTP protocol. Both peers will try
386    /// to act as initiator and acceptor sockets. Then, the connection id which
387    /// is numerically lower decides which end will assume which role (initiator
388    /// or acceptor).
389    pub fn rendezvous_connect<A: ToSocketAddrs>(udp_socket: UdpSocket,
390                                                other: A)
391                                                -> Result<UtpSocket> {
392        let addr = try!(take_address(other));
393        let mut socket = try!(UtpSocket::bind_with_udp_socket(udp_socket));
394        socket.rendezvous_connect_to(addr).map(|_| socket)
395    }
396
397    fn rendezvous_connect_to(&mut self, addr: SocketAddr) -> Result<()> {
398        self.connected_to = addr;
399
400        let mut packet = Packet::new();
401        packet.set_type(PacketType::Syn);
402        packet.set_connection_id(self.receiver_connection_id);
403        packet.set_seq_nr(self.seq_nr);
404
405        let mut buf = [0; BUF_SIZE];
406
407        let mut syn_timeout = self.congestion_timeout;
408        let mut retry_count = 0;
409
410        let mut rx_syn: Option<Packet> = None;
411        let mut rx_state: Option<Packet> = None;
412
413        while retry_count < MAX_SYN_RETRIES {
414            packet.set_timestamp_microseconds(now_microseconds());
415
416            // Send packet
417            debug!("Connecting to {}", self.connected_to);
418            try!(self.socket.send_to(&packet.to_bytes()[..], self.connected_to));
419            self.last_msg_sent_timestamp = SteadyTime::now();
420            self.state = SocketState::SynSent;
421            debug!("sent {:?}", packet);
422
423            try!(self.socket.set_read_timeout(Some(Duration::from_millis(syn_timeout))));
424
425            // Validate response
426            match self.socket.recv_from(&mut buf) {
427                Ok((read, src)) => {
428                    let mut packet = match Packet::from_bytes(&buf[..read]) {
429                        Ok(packet) => packet,
430                        Err(_) => {
431                            continue;
432                        }
433                    };
434
435                    let cid = min(self.receiver_connection_id, packet.connection_id());
436
437                    packet.set_connection_id(cid);
438
439                    // Would be nicer to handle this in the handle_packet
440                    // function, but we'd need to add new socket state
441                    // not to interfere with SocketState::New and
442                    // SocketState::SynSent states.
443                    match packet.get_type() {
444                        PacketType::Syn => {
445                            self.receiver_connection_id = cid;
446                            self.sender_connection_id = cid + 1;
447
448                            let reply = self.prepare_reply(&packet, PacketType::State);
449                            try!(self.socket.send_to(&reply.to_bytes()[..], self.connected_to));
450                            self.last_msg_sent_timestamp = SteadyTime::now();
451
452                            rx_syn = Some(packet);
453                        }
454                        PacketType::State => {
455                            self.receiver_connection_id = cid;
456                            self.sender_connection_id = cid + 1;
457
458                            rx_state = Some(packet);
459                        }
460                        _ => continue,
461                    }
462
463                    match (&rx_syn, &rx_state) {
464                        (&Some(ref _syn), &Some(ref state)) => {
465                            try!(self.handle_packet(state, src));
466                            return Ok(());
467                        }
468                        _ => continue,
469                    }
470                }
471                Err(ref e) if (e.kind() == ErrorKind::WouldBlock ||
472                               e.kind() == ErrorKind::TimedOut) => {
473                    debug!("Timed out, retrying");
474                    syn_timeout *= 2;
475                    retry_count += 1;
476                    continue;
477                }
478                Err(e) => return Err(e),
479            };
480        }
481
482        Err(Error::from(SocketError::ConnectionTimedOut))
483    }
484
485    /// Gracefully closes connection to peer.
486    ///
487    /// This method allows both peers to receive all packets still in
488    /// flight.
489    pub fn close(&mut self) -> Result<()> {
490        // Nothing to do if the socket's already closed or not connected
491        if self.state == SocketState::Closed || self.state == SocketState::New ||
492           self.state == SocketState::SynSent {
493            return Ok(());
494        }
495
496        // Flush unsent and unacknowledged packets
497        try!(self.flush());
498
499        let mut packet = Packet::new();
500        packet.set_connection_id(self.sender_connection_id);
501        packet.set_seq_nr(self.seq_nr);
502        packet.set_ack_nr(self.ack_nr);
503        packet.set_timestamp_microseconds(now_microseconds());
504        packet.set_type(PacketType::Fin);
505
506        // Send FIN
507        try!(self.socket.send_to(&packet.to_bytes()[..], self.connected_to));
508        self.last_msg_sent_timestamp = SteadyTime::now();
509        debug!("sent {:?}", packet);
510        self.state = SocketState::FinSent;
511
512        // Receive JAKE
513        let mut buf = [0; BUF_SIZE];
514        while self.state != SocketState::Closed {
515            try!(self.recv(&mut buf, false));
516        }
517
518        Ok(())
519    }
520
521    /// Receives data from socket.
522    ///
523    /// On success, returns the number of bytes read and the sender's address.
524    /// Returns 0 bytes read after receiving a FIN packet when the remaining
525    /// in-flight packets are consumed.
526    pub fn recv_from(&mut self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
527        let read = {
528            let (read_ready_0, read_ready_1) = self.read_ready_data.as_slices();
529            let mut read = 0;
530            if read_ready_0.len() > 0 {
531                read = unsafe_copy(read_ready_0, buf)
532            }
533            if read_ready_1.len() > 0 {
534                read += unsafe_copy(read_ready_1, &mut buf[read..])
535            }
536            read
537        };
538        if read > 0 {
539            self.read_ready_data.drain(..read);
540            return Ok((read, self.connected_to));
541        }
542
543        let read = self.flush_incoming_buffer(buf);
544
545        if read > 0 {
546            Ok((read, self.connected_to))
547        } else {
548            // If the socket received a reset packet and all data has been flushed, then it can't
549            // receive anything else
550            if self.state == SocketState::ResetReceived {
551                return Err(Error::from(SocketError::ConnectionReset));
552            }
553
554            loop {
555                // A closed socket with no pending data can only "read" 0 new bytes.
556                if self.state == SocketState::Closed {
557                    return Ok((0, self.connected_to));
558                }
559
560                match self.recv(buf, true) {
561                    Ok((0, _src)) => continue,
562                    Ok(x) => return Ok(x),
563                    Err(e) => return Err(e),
564                }
565            }
566        }
567    }
568
569    /// Changes read operations to block for at most the specified number of
570    /// milliseconds.
571    pub fn set_read_timeout(&mut self, user_timeout: Option<u64>) {
572        self.user_read_timeout = match user_timeout {
573            Some(t) => {
574                if t > 0 {
575                    t
576                } else {
577                    0
578                }
579            }
580            None => 0,
581        }
582    }
583
584    #[cfg(windows)]
585    fn ignore_udp_error(e: &Error) -> bool {
586        // On Windows, the recv_from operation on the UDP socket may return the
587        // following errors, which are expected and should be ignored:
588        //
589        // - 10054 (WSAECONNRESET): Windows can send this error if a previous
590        //   send operation resulted in an ICMP Port Unreachable. And if it's a
591        //   loopback interface, it can know whether there is already another
592        //   end to communicate.
593        // - 10040 (WSAEMSGSIZE): This error was randomly appearing in a test
594        //   that I conducted. Not really sure why it's happening. The frequency
595        //   decreased when I increased the receive buffer size, but it was not
596        //   important to get a network up and running.
597        //
598        // Without these changes, it was impossible to get a relatively large
599        // network running without issues. By large I mean a test that might be
600        // too bursting for a single machine to run.
601        //
602        // More references:
603        //
604        // - http://stackoverflow.com/questions/30749423/is-winsock-error-10054-wsaeconnreset-normal-with-udp-to-from-localhost#comment49588739_30749423
605        // - https://github.com/maidsafe/crust/pull/454
606        const WSAECONNRESET: i32 = 10054;
607        const WSAEMSGSIZE: i32 = 10040;
608        match e.raw_os_error() {
609            Some(e) => match e {
610                WSAECONNRESET | WSAEMSGSIZE => true,
611                _ => false,
612            },
613            None => false,
614        }
615    }
616
617    #[cfg(not(windows))]
618    fn ignore_udp_error(_: &Error) -> bool {
619        false
620    }
621
622    fn recv(&mut self, buf: &mut [u8], use_user_timeout: bool) -> Result<(usize, SocketAddr)> {
623        let mut b = [0; BUF_SIZE + HEADER_SIZE];
624        let now = SteadyTime::now();
625        let (read, src);
626        let user_timeout = if use_user_timeout {
627            self.user_read_timeout
628        } else {
629            0
630        };
631        let use_user_timeout = user_timeout != 0;
632
633        // Try to receive a packet and handle timeouts
634        loop {
635            // Abort loop if the current try exceeds the maximum number of retransmission retries.
636            if self.retries >= self.max_retransmission_retries {
637                debug!("exceeds max_retransmission_retries : {} ; current connect state is : {:?}",
638                       self.max_retransmission_retries,
639                       self.state);
640                self.state = SocketState::Closed;
641                debug!("socket marked as closed from {:?} to {:?}",
642                       self.local_addr(),
643                       self.connected_to);
644                return Err(Error::from(SocketError::ConnectionTimedOut));
645            }
646
647            let timeout;
648            let congestion_timeout = if self.state != SocketState::New {
649                debug!("setting read timeout of {} ms", self.congestion_timeout);
650                Some(Duration::from_millis(self.congestion_timeout))
651            } else {
652                None
653            };
654            {
655                let user_timeout = Duration::from_millis(user_timeout);
656                timeout = if use_user_timeout {
657                    match congestion_timeout {
658                        Some(congestion_timeout) => {
659                            use std::cmp::min;
660                            Some(min(congestion_timeout, user_timeout))
661                        }
662                        None => Some(user_timeout),
663                    }
664                } else {
665                    congestion_timeout
666                };
667            }
668
669            if use_user_timeout {
670                let user_timeout = time::Duration::milliseconds(user_timeout as i64);
671                if (SteadyTime::now() - now) >= user_timeout {
672                    return Err(Error::from(SocketError::UserTimedOut));
673                }
674            }
675
676            self.socket.set_read_timeout(timeout).expect("Error setting read timeout");
677            match self.socket.recv_from(&mut b) {
678                Ok((r, s)) => {
679                    read = r;
680                    src = s;
681                    break;
682                }
683                Err(ref e) if (e.kind() == ErrorKind::WouldBlock ||
684                               e.kind() == ErrorKind::TimedOut) => {
685                    debug!("recv_from timed out");
686                    let now = SteadyTime::now();
687                    let congestion_timeout = {
688                        time::Duration::milliseconds(self.congestion_timeout as i64)
689                    };
690                    if !use_user_timeout ||
691                       ((now - self.last_congestion_update) >= congestion_timeout) {
692                        self.last_congestion_update = now;
693                        try!(self.handle_receive_timeout());
694                        self.retries += 1;
695                    }
696                }
697                Err(ref e) if Self::ignore_udp_error(e) => (),
698                Err(e) => return Err(e),
699            };
700
701            let elapsed = (SteadyTime::now() - now).num_milliseconds();
702            debug!("{} ms elapsed", elapsed);
703        }
704
705        self.last_congestion_update = SteadyTime::now();
706        self.retries = 0;
707
708        // Decode received data into a packet
709        let packet = match Packet::from_bytes(&b[..read]) {
710            Ok(packet) => packet,
711            Err(e) => {
712                debug!("{}", e);
713                debug!("Ignoring invalid packet");
714                return Ok((0, self.connected_to));
715            }
716        };
717        debug!("received {:?}", packet);
718
719        // Process packet, including sending a reply if necessary
720        if let Some(mut pkt) = try!(self.handle_packet(&packet, src)) {
721            pkt.set_wnd_size(BUF_SIZE as u32);
722            try!(self.socket.send_to(&pkt.to_bytes()[..], src));
723            self.last_msg_sent_timestamp = SteadyTime::now();
724            debug!("sent {:?}", pkt);
725        }
726
727        // Insert data packet into the incoming buffer if it isn't a duplicate of a previously
728        // discarded packet
729        if packet.get_type() == PacketType::Data {
730            if Sequence::less(self.last_dropped, packet.seq_nr()) {
731                self.insert_into_buffer(packet);
732            }
733        }
734
735        // Flush incoming buffer if possible
736        let read = self.flush_incoming_buffer(buf);
737
738        Ok((read, src))
739    }
740
741    fn handle_receive_timeout(&mut self) -> Result<()> {
742        self.congestion_timeout *= 2;
743        self.cwnd = MSS;
744
745        // There are three possible cases here:
746        //
747        // - If the socket is sending and waiting for acknowledgements (the send window is
748        //   not empty), resend the first unacknowledged packet;
749        //
750        // - If the socket is not sending and it hasn't sent a FIN yet, then it's waiting
751        //   for incoming packets: send a fast resend request;
752        //
753        // - If the socket sent a FIN previously, resend it.
754        debug!("self.send_window: {:?}",
755               self.send_window
756                   .iter()
757                   .map(Packet::seq_nr)
758                   .collect::<Vec<u16>>());
759
760        if self.send_window.is_empty() {
761            // The socket is trying to close, all sent packets were acknowledged, and it has
762            // already sent a FIN: resend it.
763            if self.state == SocketState::FinSent {
764                let mut packet = Packet::new();
765                packet.set_connection_id(self.sender_connection_id);
766                packet.set_seq_nr(self.seq_nr);
767                packet.set_ack_nr(self.ack_nr);
768                packet.set_timestamp_microseconds(now_microseconds());
769                packet.set_type(PacketType::Fin);
770
771                // Send FIN
772                try!(self.socket.send_to(&packet.to_bytes()[..], self.connected_to));
773                self.last_msg_sent_timestamp = SteadyTime::now();
774                debug!("resent FIN: {:?}", packet);
775            } else if self.state != SocketState::New {
776                // The socket is waiting for incoming packets but the remote peer is silent:
777                // send a fast resend request.
778                debug!("sending fast resend request");
779                self.send_fast_resend_request();
780            }
781        } else {
782            // The socket is sending data packets but there is no reply from the remote
783            // peer: resend the first unacknowledged packet with the current timestamp.
784            let mut packet = &mut self.send_window[0];
785            packet.set_timestamp_microseconds(now_microseconds());
786            try!(self.socket.send_to(&packet.to_bytes()[..], self.connected_to));
787            self.last_msg_sent_timestamp = SteadyTime::now();
788            debug!("resent {:?}", packet);
789        }
790
791        Ok(())
792    }
793
794    fn prepare_reply(&self, original: &Packet, t: PacketType) -> Packet {
795        let mut resp = Packet::new();
796        resp.set_type(t);
797        let self_t_micro: u32 = now_microseconds();
798        let other_t_micro: u32 = original.timestamp_microseconds();
799        resp.set_timestamp_microseconds(self_t_micro);
800        resp.set_timestamp_difference_microseconds(self_t_micro.wrapping_sub(other_t_micro));
801        resp.set_connection_id(self.sender_connection_id);
802        resp.set_seq_nr(self.seq_nr);
803        resp.set_ack_nr(self.ack_nr);
804
805        resp
806    }
807
808    /// Removes a packet in the incoming buffer and updates the current acknowledgement number.
809    fn advance_incoming_buffer(&mut self) -> Option<Packet> {
810        if !self.incoming_buffer.is_empty() {
811            let packet = self.incoming_buffer.remove(0);
812            debug!("Removed packet from incoming buffer: {:?}", packet);
813            self.ack_nr = packet.seq_nr();
814            self.last_dropped = self.ack_nr;
815            Some(packet)
816        } else {
817            None
818        }
819    }
820
821    /// Discards sequential, ordered packets in incoming buffer, starting from
822    /// the most recently acknowledged to the most recent, as long as there are
823    /// no missing packets. The discarded packets' payload is written to the
824    /// slice `buf`, starting in position `start`.
825    /// Returns the last written index.
826    fn flush_incoming_buffer(&mut self, buf: &mut [u8]) -> usize {
827        // Return pending data from a partially read packet
828        if !self.pending_data.is_empty() {
829            let flushed = {
830                let (pending_0, pending_1) = self.pending_data.as_slices();
831                let mut flushed = 0;
832                if pending_0.len() > 0 {
833                    flushed += unsafe_copy(pending_0, buf);
834                }
835                if pending_1.len() > 0 {
836                    flushed += unsafe_copy(pending_1, &mut buf[flushed..]);
837                }
838                flushed
839            };
840
841            if flushed == self.pending_data.len() {
842                self.pending_data.clear();
843                self.advance_incoming_buffer();
844            } else {
845                self.pending_data.drain(..flushed);
846            }
847
848            return flushed;
849        }
850
851        if !self.incoming_buffer.is_empty() &&
852           (self.ack_nr == self.incoming_buffer[0].seq_nr() ||
853            self.ack_nr.wrapping_add(1) == self.incoming_buffer[0].seq_nr())
854        {
855            let flushed = unsafe_copy(&self.incoming_buffer[0].payload[..], buf);
856
857            if flushed == self.incoming_buffer[0].payload.len() {
858                self.advance_incoming_buffer();
859            } else {
860                self.pending_data.extend(self.incoming_buffer[0].payload.drain(flushed..));
861            }
862
863            return flushed;
864        }
865
866        0
867    }
868
869    /// Sends data on the socket to the remote peer. On success, returns the number of bytes
870    /// written.
871    //
872    // # Implementation details
873    //
874    // This method inserts packets into the send buffer and keeps trying to
875    // advance the send window until an ACK corresponding to the last packet is
876    // received.
877    //
878    // Note that the buffer passed to `send_to` might exceed the maximum packet
879    // size, which will result in the data being split over several packets.
880    pub fn send_to(&mut self, buf: &[u8]) -> Result<usize> {
881        if self.state == SocketState::Closed {
882            return Err(Error::from(SocketError::ConnectionClosed));
883        }
884
885        let total_length = buf.len();
886
887        for chunk in buf.chunks(MSS as usize - HEADER_SIZE) {
888            let mut packet = Packet::with_payload(chunk);
889            packet.set_seq_nr(self.seq_nr);
890            packet.set_ack_nr(self.ack_nr);
891            packet.set_connection_id(self.sender_connection_id);
892
893            self.unsent_queue.push_back(packet);
894
895            // `OverflowingOps` is marked unstable, so we can't use `overflowing_add` here
896            if self.seq_nr == ::std::u16::MAX {
897                self.seq_nr = 0;
898            } else {
899                self.seq_nr += 1;
900            }
901        }
902
903        // Send every packet in the queue
904        try!(self.send());
905
906        Ok(total_length)
907    }
908
909    /// Consumes acknowledgements for every pending packet.
910    pub fn flush(&mut self) -> Result<()> {
911        let mut buf = [0u8; BUF_SIZE];
912        while !self.send_window.is_empty() {
913            debug!("packets in send window: {}", self.send_window.len());
914            try!(self.recv(&mut buf, false));
915        }
916
917        Ok(())
918    }
919
920    fn send_state(&mut self) {
921        let mut packet = Packet::new();
922        packet.set_type(PacketType::State);
923        let self_t_micro: u32 = now_microseconds();
924        packet.set_timestamp_microseconds(self_t_micro);
925        packet.set_timestamp_difference_microseconds(self.their_delay);
926        packet.set_connection_id(self.sender_connection_id);
927        packet.set_seq_nr(self.seq_nr);
928        packet.set_ack_nr(self.ack_nr);
929        let _ = self.socket.send_to(&packet.to_bytes()[..], self.connected_to);
930        self.last_msg_sent_timestamp = SteadyTime::now();
931    }
932
933    /// Send a keepalive packet on the stream.
934    pub fn send_keepalive(&mut self) {
935        if (SteadyTime::now() - self.last_msg_sent_timestamp).num_milliseconds()
936            >= 14_000 {
937            self.send_state();
938        }
939    }
940
941    /// Sends every packet in the unsent packet queue.
942    fn send(&mut self) -> Result<()> {
943        while let Some(mut packet) = self.unsent_queue.pop_front() {
944            try!(self.send_packet(&mut packet));
945            self.curr_window += packet.len() as u32;
946            self.send_window.push(packet);
947        }
948        Ok(())
949    }
950
951    /// Send one packet.
952    #[inline]
953    fn send_packet(&mut self, packet: &mut Packet) -> Result<()> {
954        debug!("current window: {}", self.send_window.len());
955        let max_inflight = min(self.cwnd, self.remote_wnd_size);
956        let max_inflight = max(MIN_CWND * MSS, max_inflight);
957        let now = now_microseconds();
958
959        // Wait until enough in-flight packets are acknowledged for rate control purposes, but don't
960        // wait more than 500 ms (PRE_SEND_TIMEOUT) before sending the packet.
961        while self.curr_window >= max_inflight && now_microseconds() - now < PRE_SEND_TIMEOUT {
962            debug!("self.curr_window: {}", self.curr_window);
963            debug!("max_inflight: {}", max_inflight);
964            debug!("self.duplicate_ack_count: {}", self.duplicate_ack_count);
965            debug!("now_microseconds() - now = {}", now_microseconds() - now);
966            let mut buf = [0; BUF_SIZE];
967            let (read, _) = try!(self.recv(&mut buf, false));
968            self.read_ready_data.extend(&buf[..read]);
969        }
970        debug!("out: now_microseconds() - now = {}",
971               now_microseconds() - now);
972
973        // Check if it still makes sense to send packet, as we might be trying to resend a lost
974        // packet acknowledged in the receive loop above.
975        // If there were no wrapping around of sequence numbers, we'd simply check if the packet's
976        // sequence number is greater than `last_acked`.
977        let distance_a = packet.seq_nr().wrapping_sub(self.last_acked);
978        let distance_b = self.last_acked.wrapping_sub(packet.seq_nr());
979        if distance_a > distance_b {
980            debug!("Packet already acknowledged, skipping...");
981            return Ok(());
982        }
983
984        packet.set_timestamp_microseconds(now_microseconds());
985        packet.set_timestamp_difference_microseconds(self.their_delay);
986        try!(self.socket.send_to(&packet.to_bytes()[..], self.connected_to));
987        self.last_msg_sent_timestamp = SteadyTime::now();
988        debug!("sent {:?}", packet);
989
990        Ok(())
991    }
992
993    // Insert a new sample in the base delay list.
994    //
995    // The base delay list contains at most `BASE_HISTORY` samples, each sample is the minimum
996    // measured over a period of a minute (MAX_BASE_DELAY_AGE).
997    fn update_base_delay(&mut self, base_delay: i64, now: i64) {
998        if self.base_delays.is_empty() || now - self.last_rollover > MAX_BASE_DELAY_AGE {
999            // Update last rollover
1000            self.last_rollover = now;
1001
1002            // Drop the oldest sample, if need be
1003            if self.base_delays.len() == BASE_HISTORY {
1004                self.base_delays.pop_front();
1005            }
1006
1007            // Insert new sample
1008            self.base_delays.push_back(base_delay);
1009        } else {
1010            // Replace sample for the current minute if the delay is lower
1011            let last_idx = self.base_delays.len() - 1;
1012            if base_delay < self.base_delays[last_idx] {
1013                self.base_delays[last_idx] = base_delay;
1014            }
1015        }
1016    }
1017
1018    /// Inserts a new sample in the current delay list after removing samples older than one RTT, as
1019    /// specified in RFC6817.
1020    fn update_current_delay(&mut self, v: i64, now: i64) {
1021        // Remove samples more than one RTT old
1022        let rtt = self.rtt as i64 * 100;
1023        while !self.current_delays.is_empty() && now - self.current_delays[0].received_at > rtt {
1024            self.current_delays.remove(0);
1025        }
1026
1027        // Insert new measurement
1028        self.current_delays.push(DelayDifferenceSample {
1029            received_at: now,
1030            difference: v,
1031        });
1032    }
1033
1034    fn update_congestion_timeout(&mut self, current_delay: i32) {
1035        let delta = self.rtt - current_delay;
1036        self.rtt_variance += (delta.abs() - self.rtt_variance) / 4;
1037        self.rtt += (current_delay - self.rtt) / 8;
1038        self.congestion_timeout = max((self.rtt + self.rtt_variance * 4) as u64,
1039                                      MIN_CONGESTION_TIMEOUT);
1040        self.congestion_timeout = min(self.congestion_timeout, MAX_CONGESTION_TIMEOUT);
1041
1042        debug!("current_delay: {}", current_delay);
1043        debug!("delta: {}", delta);
1044        debug!("self.rtt_variance: {}", self.rtt_variance);
1045        debug!("self.rtt: {}", self.rtt);
1046        debug!("self.congestion_timeout: {}", self.congestion_timeout);
1047    }
1048
1049    /// Calculates the filtered current delay in the current window.
1050    ///
1051    /// The current delay is calculated through application of the exponential
1052    /// weighted moving average filter with smoothing factor 0.333 over the
1053    /// current delays in the current window.
1054    fn filtered_current_delay(&self) -> i64 {
1055        let input = self.current_delays.iter().map(|x| x.difference);
1056        ewma(input, 0.333) as i64
1057    }
1058
1059    /// Calculates the lowest base delay in the current window.
1060    fn min_base_delay(&self) -> i64 {
1061        self.base_delays.iter().min().cloned().unwrap_or(0)
1062    }
1063
1064    /// Builds the selective acknowledgement extension data for usage in packets.
1065    fn build_selective_ack(&self) -> Vec<u8> {
1066        let stashed = self.incoming_buffer
1067                          .iter()
1068                          .filter(|pkt| pkt.seq_nr() > self.ack_nr + 1)
1069                          .map(|pkt| (pkt.seq_nr() - self.ack_nr - 2) as usize)
1070                          .map(|diff| (diff / 8, diff % 8));
1071
1072        let mut sack = Vec::new();
1073        for (byte, bit) in stashed {
1074            // Make sure the amount of elements in the SACK vector is a
1075            // multiple of 4 and enough to represent the lost packets
1076            while byte >= sack.len() || sack.len() % 4 != 0 {
1077                sack.push(0u8);
1078            }
1079
1080            sack[byte] |= 1 << bit;
1081        }
1082
1083        sack
1084    }
1085
1086    /// Sends a fast resend request to the remote peer.
1087    ///
1088    /// A fast resend request consists of sending three State packets (acknowledging the last
1089    /// received packet) in quick succession.
1090    fn send_fast_resend_request(&mut self) {
1091        for _ in 0..3 {
1092            self.send_state();
1093        }
1094    }
1095
1096    fn resend_lost_packet(&mut self, lost_packet_nr: u16) {
1097        debug!("---> resend_lost_packet({}) <---", lost_packet_nr);
1098        match self.send_window.iter().position(|pkt| pkt.seq_nr() == lost_packet_nr) {
1099            None => debug!("Packet {} not found", lost_packet_nr),
1100            Some(position) => {
1101                debug!("self.send_window.len(): {}", self.send_window.len());
1102                debug!("position: {}", position);
1103                let mut packet = self.send_window[position].clone();
1104                // FIXME: Unchecked result
1105                let _ = self.send_packet(&mut packet);
1106
1107                // We intentionally don't increase `curr_window` because otherwise a packet's length
1108                // would be counted more than once
1109            }
1110        }
1111        debug!("---> END resend_lost_packet <---");
1112    }
1113
1114    /// Forgets sent packets that were acknowledged by the remote peer.
1115    fn advance_send_window(&mut self) {
1116        // The reason I'm not removing the first element in a loop while its sequence number is
1117        // smaller than `last_acked` is because of wrapping sequence numbers, which would create the
1118        // sequence [..., 65534, 65535, 0, 1, ...]. If `last_acked` is smaller than the first
1119        // packet's sequence number because of wraparound (for instance, 1), no packets would be
1120        // removed, as the condition `seq_nr < last_acked` would fail immediately.
1121        //
1122        // On the other hand, I can't keep removing the first packet in a loop until its sequence
1123        // number matches `last_acked` because it might never match, and in that case no packets
1124        // should be removed.
1125        if let Some(position) = self.send_window
1126                                    .iter()
1127                                    .position(|pkt| pkt.seq_nr() == self.last_acked) {
1128            for _ in 0..position + 1 {
1129                let packet = self.send_window.remove(0);
1130                self.curr_window -= packet.len() as u32;
1131            }
1132        }
1133        debug!("self.curr_window: {}", self.curr_window);
1134    }
1135
1136    /// Handles an incoming packet, updating socket state accordingly.
1137    ///
1138    /// Returns the appropriate reply packet, if needed.
1139    fn handle_packet(&mut self, packet: &Packet, src: SocketAddr) -> Result<Option<Packet>> {
1140        debug!("({:?}, {:?})", self.state, packet.get_type());
1141
1142        let is_data_or_fin = packet.get_type() == PacketType::Data
1143                          || packet.get_type() == PacketType::Fin;
1144
1145        // Acknowledge only if the packet strictly follows the previous one
1146        // and only if it is a payload packet. The restriction on PacketType
1147        // is due to all other (non Data) packets are assigned seq_nr the
1148        // same as the next Data packet, thus we could acknowledge what
1149        // we have not received yet.
1150        if is_data_or_fin && packet.seq_nr().wrapping_sub(self.ack_nr) == 1 {
1151            self.ack_nr = packet.seq_nr();
1152        }
1153
1154        // Reset connection if connection id doesn't match and this isn't a SYN
1155        if packet.get_type() != PacketType::Syn && self.state != SocketState::SynSent &&
1156           !(packet.connection_id() == self.sender_connection_id ||
1157             packet.connection_id() == self.receiver_connection_id) {
1158            return Ok(Some(self.prepare_reply(packet, PacketType::Reset)));
1159        }
1160
1161        // Update remote window size
1162        self.remote_wnd_size = packet.wnd_size();
1163        debug!("self.remote_wnd_size: {}", self.remote_wnd_size);
1164
1165        // Update remote peer's delay between them sending the packet and us receiving it
1166        let now = now_microseconds();
1167        self.their_delay = now.wrapping_sub(packet.timestamp_microseconds());
1168        debug!("self.their_delay: {}", self.their_delay);
1169
1170        match (self.state, packet.get_type()) {
1171            (SocketState::New, PacketType::Syn) => {
1172                self.connected_to = src;
1173                self.ack_nr = packet.seq_nr();
1174                self.seq_nr = rand::random();
1175                self.last_acked = self.seq_nr.wrapping_sub(1);
1176                self.receiver_connection_id = packet.connection_id() + 1;
1177                self.sender_connection_id = packet.connection_id();
1178                self.state = SocketState::Connected;
1179                self.last_dropped = self.ack_nr;
1180
1181                self.state_packet = Some(self.prepare_reply(packet, PacketType::State));
1182
1183                // Advance the self.seq_nr (the sequence number of the next packet),
1184                // this is because the other end will use the `seq_nr` of this state
1185                // packet as his `self.last_acked`
1186                self.seq_nr = self.seq_nr.wrapping_add(1);
1187
1188                Ok(self.state_packet.clone())
1189            }
1190            (SocketState::Connected, PacketType::Syn) if self.connected_to == src => {
1191                // The other end might have sent another Syn packet because
1192                // a reply to the first one did not arrive within a timeout
1193                // caused by network congestion.
1194                Ok(self.state_packet.clone())
1195            }
1196            (_, PacketType::Syn) => {
1197                Ok(Some(self.prepare_reply(packet, PacketType::Reset)))
1198            }
1199            (SocketState::SynSent, PacketType::State) => {
1200                self.connected_to = src;
1201                self.ack_nr = packet.seq_nr();
1202                self.seq_nr += 1;
1203                self.state = SocketState::Connected;
1204                self.last_acked = packet.ack_nr();
1205                self.last_dropped = packet.seq_nr();
1206                self.last_acked_timestamp = now_microseconds();
1207                Ok(None)
1208            }
1209            (SocketState::SynSent, _) => Err(Error::from(SocketError::InvalidReply)),
1210            (SocketState::Connected, PacketType::Data) |
1211            (SocketState::FinSent, PacketType::Data) => Ok(self.handle_data_packet(packet)),
1212            (SocketState::Connected, PacketType::State) => {
1213                self.handle_state_packet(packet);
1214                Ok(None)
1215            }
1216            (SocketState::Connected, PacketType::Fin) |
1217            (SocketState::FinSent, PacketType::Fin) => {
1218                if packet.ack_nr() < self.seq_nr {
1219                    debug!("FIN received but there are missing acknowledgements for sent packets");
1220                }
1221                let mut reply = self.prepare_reply(packet, PacketType::State);
1222                if packet.seq_nr().wrapping_sub(self.ack_nr) > 1 {
1223                    debug!("current ack_nr ({}) is behind received packet seq_nr ({})",
1224                           self.ack_nr,
1225                           packet.seq_nr());
1226
1227                    // Set SACK extension payload if the packet is not in order
1228                    let sack = self.build_selective_ack();
1229
1230                    if sack.len() > 0 {
1231                        reply.set_sack(sack);
1232                    }
1233                }
1234
1235                // Give up, the remote peer might not care about our missing packets
1236                self.state = SocketState::Closed;
1237                Ok(Some(reply))
1238            }
1239            (SocketState::FinSent, PacketType::State) => {
1240                if packet.ack_nr() == self.seq_nr {
1241                    self.state = SocketState::Closed;
1242                } else {
1243                    self.handle_state_packet(packet);
1244                }
1245                Ok(None)
1246            }
1247            (_, PacketType::Reset) => {
1248                self.state = SocketState::ResetReceived;
1249                Err(Error::from(SocketError::ConnectionReset))
1250            }
1251            (state, ty) => {
1252                let message = format!("Unimplemented handling for ({:?},{:?})", state, ty);
1253                debug!("{}", message);
1254                Err(Error::new(ErrorKind::Other, message))
1255            }
1256        }
1257    }
1258
1259    fn handle_data_packet(&mut self, packet: &Packet) -> Option<Packet> {
1260        // If a FIN was previously sent, reply with a FIN packet acknowledging the received packet.
1261        let packet_type = if self.state == SocketState::FinSent {
1262            PacketType::Fin
1263        } else {
1264            PacketType::State
1265        };
1266        let mut reply = self.prepare_reply(packet, packet_type);
1267
1268        if packet.seq_nr().wrapping_sub(self.ack_nr) > 1 {
1269            debug!("current ack_nr ({}) is behind received packet seq_nr ({})",
1270                   self.ack_nr,
1271                   packet.seq_nr());
1272
1273            // Set SACK extension payload if the packet is not in order
1274            let sack = self.build_selective_ack();
1275
1276            if sack.len() > 0 {
1277                reply.set_sack(sack);
1278            }
1279        }
1280
1281        Some(reply)
1282    }
1283
1284    fn queuing_delay(&self) -> i64 {
1285        let filtered_current_delay = self.filtered_current_delay();
1286        let min_base_delay = self.min_base_delay();
1287        let queuing_delay = filtered_current_delay - min_base_delay;
1288
1289        debug!("filtered_current_delay: {}", filtered_current_delay);
1290        debug!("min_base_delay: {}", min_base_delay);
1291        debug!("queuing_delay: {}", queuing_delay);
1292
1293        queuing_delay
1294    }
1295
1296    /// Calculates the new congestion window size, increasing it or decreasing it.
1297    ///
1298    /// This is the core of uTP, the [LEDBAT][ledbat_rfc] congestion algorithm. It depends on
1299    /// estimating the queuing delay between the two peers, and adjusting the congestion window
1300    /// accordingly.
1301    ///
1302    /// `off_target` is a normalized value representing the difference between the current queuing
1303    /// delay and a fixed target delay (`TARGET`). `off_target` ranges between -1.0 and 1.0. A
1304    /// positive value makes the congestion window increase, while a negative value makes the
1305    /// congestion window decrease.
1306    ///
1307    /// `bytes_newly_acked` is the number of bytes acknowledged by an inbound `State` packet. It may
1308    /// be the size of the packet explicitly acknowledged by the inbound packet (i.e., with sequence
1309    /// number equal to the inbound packet's acknowledgement number), or every packet implicitly
1310    /// acknowledged (every packet with sequence number between the previous inbound `State`
1311    /// packet's acknowledgement number and the current inbound `State` packet's acknowledgement
1312    /// number).
1313    ///
1314    ///[ledbat_rfc]: https://tools.ietf.org/html/rfc6817
1315    fn update_congestion_window(&mut self, off_target: f64, bytes_newly_acked: u32) {
1316        let flightsize = self.curr_window;
1317
1318        let cwnd_increase = GAIN * off_target * bytes_newly_acked as f64 * MSS as f64;
1319        let cwnd_increase = cwnd_increase / self.cwnd as f64;
1320        debug!("cwnd_increase: {}", cwnd_increase);
1321
1322        self.cwnd = (self.cwnd as f64 + cwnd_increase) as u32;
1323        let max_allowed_cwnd = flightsize + ALLOWED_INCREASE * MSS;
1324        self.cwnd = min(self.cwnd, max_allowed_cwnd);
1325        self.cwnd = max(self.cwnd, MIN_CWND * MSS);
1326
1327        debug!("cwnd: {}", self.cwnd);
1328        debug!("max_allowed_cwnd: {}", max_allowed_cwnd);
1329    }
1330
1331    fn handle_state_packet(&mut self, packet: &Packet) {
1332        if packet.ack_nr() == self.last_acked {
1333            self.duplicate_ack_count += 1;
1334        } else {
1335            self.last_acked = packet.ack_nr();
1336            self.last_acked_timestamp = now_microseconds();
1337            self.duplicate_ack_count = 1;
1338        }
1339
1340        // Update congestion window size
1341        if let Some(index) = self.send_window.iter().position(|p| packet.ack_nr() == p.seq_nr()) {
1342            // Calculate the sum of the size of every packet implicitly and explicitly acknowledged
1343            // by the inbound packet (i.e., every packet whose sequence number precedes the inbound
1344            // packet's acknowledgement number, plus the packet whose sequence number matches)
1345            let bytes_newly_acked = self.send_window
1346                                        .iter()
1347                                        .take(index + 1)
1348                                        .fold(0, |acc, p| acc + p.len());
1349
1350            // Update base and current delay
1351            let now = now_microseconds() as i64;
1352            let our_delay = now - self.send_window[index].timestamp_microseconds() as i64;
1353            debug!("our_delay: {}", our_delay);
1354            self.update_base_delay(our_delay, now);
1355            self.update_current_delay(our_delay, now);
1356
1357            let off_target: f64 = (TARGET as f64 - self.queuing_delay() as f64) / TARGET as f64;
1358            debug!("off_target: {}", off_target);
1359
1360            self.update_congestion_window(off_target, bytes_newly_acked as u32);
1361
1362            // Update congestion timeout
1363            let rtt = (TARGET - off_target as i64) / 1000; // in milliseconds
1364            self.update_congestion_timeout(rtt as i32);
1365        }
1366
1367        let mut packet_loss_detected: bool = !self.send_window.is_empty() &&
1368                                             self.duplicate_ack_count == 3;
1369
1370        // Process extensions, if any
1371        for extension in packet.extensions.iter() {
1372            if extension.get_type() == ExtensionType::SelectiveAck {
1373                // If three or more packets are acknowledged past the implicit missing one,
1374                // assume it was lost.
1375                if extension.iter().count_ones() >= 3 {
1376                    self.resend_lost_packet(packet.ack_nr() + 1);
1377                    packet_loss_detected = true;
1378                }
1379
1380                if let Some(last_seq_nr) = self.send_window.last().map(Packet::seq_nr) {
1381                    for seq_nr in extension.iter()
1382                                           .enumerate()
1383                                           .filter(|&(_idx, received)| !received)
1384                                           .map(|(idx, _received)| {
1385                                               packet.ack_nr() + 2 + idx as u16
1386                                           })
1387                                           .take_while(|&seq_nr| seq_nr < last_seq_nr) {
1388                        debug!("SACK: packet {} lost", seq_nr);
1389                        self.resend_lost_packet(seq_nr);
1390                        packet_loss_detected = true;
1391                    }
1392                }
1393            } else {
1394                debug!("Unknown extension {:?}, ignoring", extension.get_type());
1395            }
1396        }
1397
1398        // Three duplicate ACKs mean a fast resend request. Resend the first unacknowledged packet
1399        // if the incoming packet doesn't have a SACK extension. If it does, the lost packets were
1400        // already resent.
1401        if !self.send_window.is_empty() && self.duplicate_ack_count == 3 &&
1402           !packet.extensions.iter().any(|ext| ext.get_type() == ExtensionType::SelectiveAck) {
1403            self.resend_lost_packet(packet.ack_nr().wrapping_add(1));
1404        }
1405
1406        // Packet lost, halve the congestion window
1407        if packet_loss_detected {
1408            debug!("packet loss detected, halving congestion window");
1409            self.cwnd = max(self.cwnd / 2, MIN_CWND * MSS);
1410            debug!("cwnd: {}", self.cwnd);
1411        }
1412
1413        // Success, advance send window
1414        self.advance_send_window();
1415    }
1416
1417    /// Inserts a packet into the socket's buffer.
1418    ///
1419    /// The packet is inserted in such a way that the packets in the buffer are sorted according to
1420    /// their sequence number in ascending order. This allows storing packets that were received out
1421    /// of order.
1422    ///
1423    /// Trying to insert a duplicate of a packet will silently fail.
1424    /// it's more recent (larger timestamp).
1425    fn insert_into_buffer(&mut self, packet: Packet) {
1426        // Immediately push to the end if the packet's sequence number comes after the last
1427        // packet's.
1428        if self.incoming_buffer.last().map(|p| packet.seq_nr() > p.seq_nr()).unwrap_or(false) {
1429            self.incoming_buffer.push(packet);
1430        } else {
1431            // Find index following the most recent packet before the one we wish to insert
1432            let i = self.incoming_buffer.iter().filter(|p| p.seq_nr() < packet.seq_nr()).count();
1433
1434            if self.incoming_buffer.get(i).map(|p| p.seq_nr() != packet.seq_nr()).unwrap_or(true) {
1435                self.incoming_buffer.insert(i, packet);
1436            }
1437        }
1438    }
1439}
1440
1441impl Drop for UtpSocket {
1442    fn drop(&mut self) {
1443        let _ = self.close();
1444    }
1445}
1446
1447/// A structure representing a socket server.
1448///
1449/// # Examples
1450///
1451/// ```no_run
1452/// use utp::{UtpListener, UtpSocket};
1453/// use std::thread;
1454///
1455/// fn handle_client(socket: UtpSocket) {
1456///     // ...
1457/// }
1458///
1459/// fn main() {
1460///     // Create a listener
1461///     let addr = "127.0.0.1:8080";
1462///     let listener = UtpListener::bind(addr).expect("Error binding socket");
1463///
1464///     for connection in listener.incoming() {
1465///         // Spawn a new handler for each new connection
1466///         if let Ok((socket, _src)) = connection {
1467///             thread::spawn(move || handle_client(socket));
1468///         }
1469///     }
1470/// }
1471/// ```
1472pub struct UtpListener {
1473    /// The public facing UDP socket
1474    socket: UdpSocket,
1475}
1476
1477impl UtpListener {
1478    /// Creates a new `UtpListener` bound to a specific address.
1479    ///
1480    /// The resulting listener is ready for accepting connections.
1481    ///
1482    /// The address type can be any implementer of the `ToSocketAddr` trait. See its documentation
1483    /// for concrete examples.
1484    ///
1485    /// If more than one valid address is specified, only the first will be used.
1486    pub fn bind<A: ToSocketAddrs>(addr: A) -> Result<UtpListener> {
1487        UdpSocket::bind(addr).and_then(|s| Ok(UtpListener { socket: s }))
1488    }
1489
1490    /// Accepts a new incoming connection from this listener.
1491    ///
1492    /// This function will block the caller until a new uTP connection is established. When
1493    /// established, the corresponding `UtpSocket` and the peer's remote address will be returned.
1494    ///
1495    /// Notice that the resulting `UtpSocket` is bound to a different local port than the public
1496    /// listening port (which `UtpListener` holds). This may confuse the remote peer!
1497    pub fn accept(&self) -> Result<(UtpSocket, SocketAddr)> {
1498        let mut buf = [0; BUF_SIZE];
1499
1500        match self.socket.recv_from(&mut buf) {
1501            Ok((nread, src)) => {
1502                let packet = try!(Packet::from_bytes(&buf[..nread])
1503                                      .or(Err(SocketError::InvalidPacket)));
1504
1505                // Ignore non-SYN packets
1506                if packet.get_type() != PacketType::Syn {
1507                    return Err(Error::from(SocketError::InvalidPacket));
1508                }
1509
1510                // The address of the new socket will depend on the type of the listener.
1511                let inner_socket = self.socket.local_addr().and_then(|addr| {
1512                    match addr {
1513                        SocketAddr::V4(_) => UdpSocket::bind("0.0.0.0:0"),
1514                        SocketAddr::V6(_) => UdpSocket::bind(":::0"),
1515                    }
1516                });
1517
1518                let mut socket = try!(inner_socket.map(|s| UtpSocket::from_raw_parts(s, src)));
1519
1520                // Establish connection with remote peer
1521                match socket.handle_packet(&packet, src) {
1522                    Ok(Some(reply)) => try!(socket.socket.send_to(&reply.to_bytes()[..], src)),
1523                    Ok(None) => return Err(Error::from(SocketError::InvalidPacket)),
1524                    Err(e) => return Err(e),
1525                };
1526
1527                Ok((socket, src))
1528            }
1529            Err(e) => Err(e),
1530        }
1531    }
1532
1533    /// Returns an iterator over the connections being received by this listener.
1534    ///
1535    /// The returned iterator will never return `None`.
1536    pub fn incoming(&self) -> Incoming {
1537        Incoming { listener: self }
1538    }
1539
1540    /// Returns the local socket address of this listener.
1541    pub fn local_addr(&self) -> Result<SocketAddr> {
1542        self.socket.local_addr()
1543    }
1544}
1545
1546pub struct Incoming<'a> {
1547    listener: &'a UtpListener,
1548}
1549
1550impl<'a> Iterator for Incoming<'a> {
1551    type Item = Result<(UtpSocket, SocketAddr)>;
1552
1553    fn next(&mut self) -> Option<Result<(UtpSocket, SocketAddr)>> {
1554        Some(self.listener.accept())
1555    }
1556}
1557
1558#[cfg(test)]
1559mod test {
1560    use std::thread;
1561    use std::net::ToSocketAddrs;
1562    use std::io::ErrorKind;
1563    use super::{UtpSocket, UtpListener, SocketState, BUF_SIZE, take_address};
1564    use packet::{Packet, PacketType, Encodable, Decodable};
1565    use util::now_microseconds;
1566    use rand;
1567
1568    macro_rules! iotry {
1569        ($e:expr) => (match $e { Ok(e) => e, Err(e) => panic!("{:?}", e) })
1570    }
1571
1572    fn next_test_port() -> u16 {
1573        use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
1574        static NEXT_OFFSET: AtomicUsize = ATOMIC_USIZE_INIT;
1575        const BASE_PORT: u16 = 9600;
1576        BASE_PORT + NEXT_OFFSET.fetch_add(1, Ordering::Relaxed) as u16
1577    }
1578
1579    fn next_test_ip4<'a>() -> (&'a str, u16) {
1580        ("127.0.0.1", next_test_port())
1581    }
1582
1583    fn next_test_ip6<'a>() -> (&'a str, u16) {
1584        ("::1", next_test_port())
1585    }
1586
1587    #[test]
1588    fn test_socket_ipv4() {
1589        let server_addr = next_test_ip4();
1590
1591        let mut server = iotry!(UtpSocket::bind(server_addr));
1592        assert!(server.state == SocketState::New);
1593
1594        let child = thread::spawn(move || {
1595            let mut client = iotry!(UtpSocket::connect(server_addr));
1596            assert!(client.state == SocketState::Connected);
1597            // Check proper difference in client's send connection id and receive connection id
1598            assert_eq!(client.sender_connection_id,
1599                       client.receiver_connection_id + 1);
1600            assert_eq!(client.connected_to,
1601                       server_addr.to_socket_addrs().unwrap().next().unwrap());
1602            iotry!(client.close());
1603            drop(client);
1604        });
1605
1606        let mut buf = [0u8; BUF_SIZE];
1607        match server.recv_from(&mut buf) {
1608            e => println!("{:?}", e),
1609        }
1610        // After establishing a new connection, the server's ids are a mirror of the client's.
1611        assert_eq!(server.receiver_connection_id,
1612                   server.sender_connection_id + 1);
1613
1614        assert!(server.state == SocketState::Closed);
1615        drop(server);
1616
1617        assert!(child.join().is_ok());
1618    }
1619
1620    #[test]
1621    fn test_socket_ipv6() {
1622        let server_addr = next_test_ip6();
1623
1624        let mut server = iotry!(UtpSocket::bind(server_addr));
1625        assert!(server.state == SocketState::New);
1626
1627        let child = thread::spawn(move || {
1628            let mut client = iotry!(UtpSocket::connect(server_addr));
1629            assert!(client.state == SocketState::Connected);
1630            // Check proper difference in client's send connection id and receive connection id
1631            assert_eq!(client.sender_connection_id,
1632                       client.receiver_connection_id + 1);
1633            assert_eq!(client.connected_to,
1634                       server_addr.to_socket_addrs().unwrap().next().unwrap());
1635            iotry!(client.close());
1636            drop(client);
1637        });
1638
1639        let mut buf = [0u8; BUF_SIZE];
1640        match server.recv_from(&mut buf) {
1641            e => println!("{:?}", e),
1642        }
1643        // After establishing a new connection, the server's ids are a mirror of the client's.
1644        assert_eq!(server.receiver_connection_id,
1645                   server.sender_connection_id + 1);
1646
1647        assert!(server.state == SocketState::Closed);
1648        drop(server);
1649
1650        assert!(child.join().is_ok());
1651    }
1652
1653    #[test]
1654    fn test_rendezvous_connect() {
1655        use std::net::{UdpSocket, Ipv4Addr, SocketAddrV4};
1656
1657        let peer1_udp_socket = iotry!(UdpSocket::bind("0.0.0.0:0"));
1658        let peer2_udp_socket = iotry!(UdpSocket::bind("0.0.0.0:0"));
1659
1660        let peer1_port = iotry!(peer1_udp_socket.local_addr()).port();
1661        let peer1_addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), peer1_port);
1662
1663        let peer2_port = iotry!(peer2_udp_socket.local_addr()).port();
1664        let peer2_addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), peer2_port);
1665
1666        const BUF_LEN: u32 = 16777216;
1667
1668        let tx_buffer: Vec<u8> = (0..BUF_LEN).map(|_| rand::random::<u8>()).collect();
1669
1670        let t = thread::spawn(move || {
1671            let mut peer1 = iotry!(UtpSocket::rendezvous_connect(peer1_udp_socket, peer2_addr));
1672            let mut sent_total = 0;
1673            while sent_total < tx_buffer.len() {
1674                let chunk_size = rand::random::<u16>() as usize + 1;
1675                let slice_end = ::std::cmp::min(tx_buffer.len(), sent_total + chunk_size);
1676                let sent = peer1.send_to(&tx_buffer[sent_total..slice_end]).unwrap();
1677                sent_total += sent;
1678            }
1679            let r = peer1.flush();
1680            r.unwrap();
1681            let _ = peer1.close();
1682            tx_buffer
1683        });
1684
1685        let mut peer2 = iotry!(UtpSocket::rendezvous_connect(peer2_udp_socket, peer1_addr));
1686        let mut rx_buffer: Vec<u8> = (0..BUF_LEN).into_iter().map(|_| 0u8).collect();
1687        let mut received_total = 0;
1688        while received_total < rx_buffer.len() {
1689            let chunk_size = rand::random::<u16>() as usize + 1;
1690            ::std::thread::sleep(::std::time::Duration::from_millis(1));
1691            let slice_end = ::std::cmp::min(rx_buffer.len(), received_total + chunk_size);
1692            let (received, _) = peer2.recv_from(&mut rx_buffer[received_total..slice_end]).unwrap();
1693            received_total += received;
1694        }
1695        let tx_buffer = t.join().unwrap();
1696        assert_eq!(tx_buffer, rx_buffer);
1697        let _ = peer2.close();
1698    }
1699
1700    #[test]
1701    fn test_recvfrom_on_closed_socket() {
1702        let server_addr = next_test_ip4();
1703
1704        let mut server = iotry!(UtpSocket::bind(server_addr));
1705        assert!(server.state == SocketState::New);
1706
1707        let child = thread::spawn(move || {
1708            let mut client = iotry!(UtpSocket::connect(server_addr));
1709            assert!(client.state == SocketState::Connected);
1710            assert!(client.close().is_ok());
1711        });
1712
1713        // Make the server listen for incoming connections until the end of the input
1714        let mut buf = [0u8; BUF_SIZE];
1715        let _resp = server.recv_from(&mut buf);
1716        assert!(server.state == SocketState::Closed);
1717
1718        // Trying to receive again returns `Ok(0)` (equivalent to the old `EndOfFile`)
1719        match server.recv_from(&mut buf) {
1720            Ok((0, _src)) => {}
1721            e => panic!("Expected Ok(0), got {:?}", e),
1722        }
1723        assert_eq!(server.state, SocketState::Closed);
1724
1725        assert!(child.join().is_ok());
1726    }
1727
1728    #[test]
1729    fn test_sendto_on_closed_socket() {
1730        let server_addr = next_test_ip4();
1731
1732        let mut server = iotry!(UtpSocket::bind(server_addr));
1733        assert!(server.state == SocketState::New);
1734
1735        let child = thread::spawn(move || {
1736            let mut client = iotry!(UtpSocket::connect(server_addr));
1737            assert!(client.state == SocketState::Connected);
1738            iotry!(client.close());
1739        });
1740
1741        // Make the server listen for incoming connections
1742        let mut buf = [0u8; BUF_SIZE];
1743        let (_read, _src) = iotry!(server.recv_from(&mut buf));
1744        assert_eq!(server.state, SocketState::Closed);
1745
1746        // Trying to send to the socket after closing it raises an error
1747        match server.send_to(&buf) {
1748            Err(ref e) if e.kind() == ErrorKind::NotConnected => (),
1749            v => panic!("expected {:?}, got {:?}", ErrorKind::NotConnected, v),
1750        }
1751
1752        assert!(child.join().is_ok());
1753    }
1754
1755    #[test]
1756    fn test_acks_on_socket() {
1757        use std::sync::mpsc::channel;
1758        let server_addr = next_test_ip4();
1759        let (tx, rx) = channel();
1760
1761        let mut server = iotry!(UtpSocket::bind(server_addr));
1762
1763        let child = thread::spawn(move || {
1764            // Make the server listen for incoming connections
1765            let mut buf = [0u8; BUF_SIZE];
1766            let _resp = server.recv(&mut buf, false);
1767            tx.send(server.seq_nr).unwrap();
1768
1769            // Close the connection
1770            iotry!(server.recv_from(&mut buf));
1771
1772            drop(server);
1773        });
1774
1775        let mut client = iotry!(UtpSocket::connect(server_addr));
1776        assert!(client.state == SocketState::Connected);
1777        let sender_seq_nr = rx.recv().unwrap();
1778        let ack_nr = client.ack_nr;
1779        assert!(ack_nr != 0);
1780        assert!(ack_nr.wrapping_add(1) == sender_seq_nr);
1781        assert!(client.close().is_ok());
1782
1783        // The reply to both connect (SYN) and close (FIN) should be
1784        // STATE packets, which don't increase the sequence number
1785        // and, hence, the receiver's acknowledgement number.
1786        assert!(client.ack_nr == ack_nr);
1787        drop(client);
1788
1789        assert!(child.join().is_ok());
1790    }
1791
1792    #[test]
1793    fn test_handle_packet() {
1794        // fn test_connection_setup() {
1795        let initial_connection_id: u16 = rand::random();
1796        let sender_connection_id = initial_connection_id + 1;
1797        let (server_addr, client_addr) = (next_test_ip4()
1798                                              .to_socket_addrs()
1799                                              .unwrap()
1800                                              .next()
1801                                              .unwrap(),
1802                                          next_test_ip4()
1803                                              .to_socket_addrs()
1804                                              .unwrap()
1805                                              .next()
1806                                              .unwrap());
1807        let mut socket = iotry!(UtpSocket::bind(server_addr));
1808
1809        let mut packet = Packet::new();
1810        packet.set_wnd_size(BUF_SIZE as u32);
1811        packet.set_type(PacketType::Syn);
1812        packet.set_connection_id(initial_connection_id);
1813
1814        // Do we have a response?
1815        let response = socket.handle_packet(&packet, client_addr);
1816        assert!(response.is_ok());
1817        let response = response.unwrap();
1818        assert!(response.is_some());
1819
1820        // Is is of the correct type?
1821        let response = response.unwrap();
1822        assert!(response.get_type() == PacketType::State);
1823
1824        // Same connection id on both ends during connection establishment
1825        assert!(response.connection_id() == packet.connection_id());
1826
1827        // Response acknowledges SYN
1828        assert!(response.ack_nr() == packet.seq_nr());
1829
1830        // No payload?
1831        assert!(response.payload.is_empty());
1832        // }
1833
1834        // ---------------------------------
1835
1836        // fn test_connection_usage() {
1837        let old_packet = packet;
1838        let old_response = response;
1839
1840        let mut packet = Packet::new();
1841        packet.set_type(PacketType::Data);
1842        packet.set_connection_id(sender_connection_id);
1843        packet.set_seq_nr(old_packet.seq_nr() + 1);
1844        packet.set_ack_nr(old_response.seq_nr());
1845
1846        let response = socket.handle_packet(&packet, client_addr);
1847        assert!(response.is_ok());
1848        let response = response.unwrap();
1849        assert!(response.is_some());
1850
1851        let response = response.unwrap();
1852        assert!(response.get_type() == PacketType::State);
1853
1854        // Sender (i.e., who the initiated connection and sent a SYN) has connection id equal to
1855        // initial connection id + 1
1856        // Receiver (i.e., who accepted connection) has connection id equal to initial connection id
1857        assert!(response.connection_id() == initial_connection_id);
1858        assert!(response.connection_id() == packet.connection_id() - 1);
1859
1860        // Previous packets should be ack'ed
1861        assert!(response.ack_nr() == packet.seq_nr());
1862
1863        // Responses with no payload should not increase the sequence number
1864        // unless it's the State packet sent to acknowledge the Syn packet as
1865        // explained at
1866        // <http://www.bittorrent.org/beps/bep_0029.html#connection-setup>
1867        assert!(response.payload.is_empty());
1868        assert!(response.seq_nr() == old_response.seq_nr().wrapping_add(1));
1869        // }
1870
1871        // fn test_connection_teardown() {
1872        let old_packet = packet;
1873        let old_response = response;
1874
1875        let mut packet = Packet::new();
1876        packet.set_type(PacketType::Fin);
1877        packet.set_connection_id(sender_connection_id);
1878        packet.set_seq_nr(old_packet.seq_nr() + 1);
1879        packet.set_ack_nr(old_response.seq_nr());
1880
1881        let response = socket.handle_packet(&packet, client_addr);
1882        assert!(response.is_ok());
1883        let response = response.unwrap();
1884        assert!(response.is_some());
1885
1886        let response = response.unwrap();
1887
1888        assert!(response.get_type() == PacketType::State);
1889
1890        // FIN packets have no payload but the sequence number shouldn't increase
1891        assert!(packet.seq_nr() == old_packet.seq_nr() + 1);
1892
1893        // Nor should the ACK packet's sequence number
1894        assert!(response.seq_nr() == old_response.seq_nr());
1895
1896        // FIN should be acknowledged
1897        assert!(response.ack_nr() == packet.seq_nr());
1898
1899        // }
1900    }
1901
1902    #[test]
1903    fn test_response_to_keepalive_ack() {
1904        // Boilerplate test setup
1905        let initial_connection_id: u16 = rand::random();
1906        let (server_addr, client_addr) = (next_test_ip4()
1907                                              .to_socket_addrs()
1908                                              .unwrap()
1909                                              .next()
1910                                              .unwrap(),
1911                                          next_test_ip4()
1912                                              .to_socket_addrs()
1913                                              .unwrap()
1914                                              .next()
1915                                              .unwrap());
1916        let mut socket = iotry!(UtpSocket::bind(server_addr));
1917
1918        // Establish connection
1919        let mut packet = Packet::new();
1920        packet.set_wnd_size(BUF_SIZE as u32);
1921        packet.set_type(PacketType::Syn);
1922        packet.set_connection_id(initial_connection_id);
1923
1924        let response = socket.handle_packet(&packet, client_addr);
1925        assert!(response.is_ok());
1926        let response = response.unwrap();
1927        assert!(response.is_some());
1928        let response = response.unwrap();
1929        assert!(response.get_type() == PacketType::State);
1930
1931        let old_packet = packet;
1932        let old_response = response;
1933
1934        // Now, send a keepalive packet
1935        let mut packet = Packet::new();
1936        packet.set_wnd_size(BUF_SIZE as u32);
1937        packet.set_type(PacketType::State);
1938        packet.set_connection_id(initial_connection_id);
1939        packet.set_seq_nr(old_packet.seq_nr() + 1);
1940        packet.set_ack_nr(old_response.seq_nr());
1941
1942        let response = socket.handle_packet(&packet, client_addr);
1943        assert!(response.is_ok());
1944        let response = response.unwrap();
1945        assert!(response.is_none());
1946
1947        // Send a second keepalive packet, identical to the previous one
1948        let response = socket.handle_packet(&packet, client_addr);
1949        assert!(response.is_ok());
1950        let response = response.unwrap();
1951        assert!(response.is_none());
1952
1953        // Mark socket as closed
1954        socket.state = SocketState::Closed;
1955    }
1956
1957    #[test]
1958    fn test_response_to_wrong_connection_id() {
1959        // Boilerplate test setup
1960        let initial_connection_id: u16 = rand::random();
1961        let (server_addr, client_addr) = (next_test_ip4()
1962                                              .to_socket_addrs()
1963                                              .unwrap()
1964                                              .next()
1965                                              .unwrap(),
1966                                          next_test_ip4()
1967                                              .to_socket_addrs()
1968                                              .unwrap()
1969                                              .next()
1970                                              .unwrap());
1971        let mut socket = iotry!(UtpSocket::bind(server_addr));
1972
1973        // Establish connection
1974        let mut packet = Packet::new();
1975        packet.set_wnd_size(BUF_SIZE as u32);
1976        packet.set_type(PacketType::Syn);
1977        packet.set_connection_id(initial_connection_id);
1978
1979        let response = socket.handle_packet(&packet, client_addr);
1980        assert!(response.is_ok());
1981        let response = response.unwrap();
1982        assert!(response.is_some());
1983        assert!(response.unwrap().get_type() == PacketType::State);
1984
1985        // Now, disrupt connection with a packet with an incorrect connection id
1986        let new_connection_id = initial_connection_id.wrapping_mul(2);
1987
1988        let mut packet = Packet::new();
1989        packet.set_wnd_size(BUF_SIZE as u32);
1990        packet.set_type(PacketType::State);
1991        packet.set_connection_id(new_connection_id);
1992
1993        let response = socket.handle_packet(&packet, client_addr);
1994        assert!(response.is_ok());
1995        let response = response.unwrap();
1996        assert!(response.is_some());
1997
1998        let response = response.unwrap();
1999        assert!(response.get_type() == PacketType::Reset);
2000        assert!(response.ack_nr() == packet.seq_nr());
2001
2002        // Mark socket as closed
2003        socket.state = SocketState::Closed;
2004    }
2005
2006    #[test]
2007    fn test_unordered_packets() {
2008        // Boilerplate test setup
2009        let initial_connection_id: u16 = rand::random();
2010        let (server_addr, client_addr) = (next_test_ip4()
2011                                              .to_socket_addrs()
2012                                              .unwrap()
2013                                              .next()
2014                                              .unwrap(),
2015                                          next_test_ip4()
2016                                              .to_socket_addrs()
2017                                              .unwrap()
2018                                              .next()
2019                                              .unwrap());
2020        let mut socket = iotry!(UtpSocket::bind(server_addr));
2021
2022        // Establish connection
2023        let mut packet = Packet::new();
2024        packet.set_wnd_size(BUF_SIZE as u32);
2025        packet.set_type(PacketType::Syn);
2026        packet.set_connection_id(initial_connection_id);
2027
2028        let response = socket.handle_packet(&packet, client_addr);
2029        assert!(response.is_ok());
2030        let response = response.unwrap();
2031        assert!(response.is_some());
2032        let response = response.unwrap();
2033        assert!(response.get_type() == PacketType::State);
2034
2035        let old_packet = packet;
2036        let old_response = response;
2037
2038        let mut window: Vec<Packet> = Vec::new();
2039
2040        // Now, send a keepalive packet
2041        let mut packet = Packet::new();
2042        packet.set_wnd_size(BUF_SIZE as u32);
2043        packet.set_type(PacketType::Data);
2044        packet.set_connection_id(initial_connection_id);
2045        packet.set_seq_nr(old_packet.seq_nr() + 1);
2046        packet.set_ack_nr(old_response.seq_nr());
2047        packet.payload = vec![1, 2, 3];
2048        window.push(packet);
2049
2050        let mut packet = Packet::new();
2051        packet.set_wnd_size(BUF_SIZE as u32);
2052        packet.set_type(PacketType::Data);
2053        packet.set_connection_id(initial_connection_id);
2054        packet.set_seq_nr(old_packet.seq_nr() + 2);
2055        packet.set_ack_nr(old_response.seq_nr());
2056        packet.payload = vec![4, 5, 6];
2057        window.push(packet);
2058
2059        // Send packets in reverse order
2060        let response = socket.handle_packet(&window[1], client_addr);
2061        assert!(response.is_ok());
2062        let response = response.unwrap();
2063        assert!(response.is_some());
2064        let response = response.unwrap();
2065        assert!(response.ack_nr() != window[1].seq_nr());
2066
2067        let response = socket.handle_packet(&window[0], client_addr);
2068        assert!(response.is_ok());
2069        let response = response.unwrap();
2070        assert!(response.is_some());
2071
2072        // Mark socket as closed
2073        socket.state = SocketState::Closed;
2074    }
2075
2076    #[test]
2077    fn test_socket_unordered_packets() {
2078        let server_addr = next_test_ip4();
2079
2080        let mut server = iotry!(UtpSocket::bind(server_addr));
2081        assert!(server.state == SocketState::New);
2082
2083        let child = thread::spawn(move || {
2084            let mut client = iotry!(UtpSocket::connect(server_addr));
2085            assert!(client.state == SocketState::Connected);
2086            // Check proper difference in client's send connection id and receive connection id
2087            assert_eq!(client.sender_connection_id,
2088                       client.receiver_connection_id + 1);
2089            let s = client.socket.try_clone().ok().expect("Error cloning internal UDP socket");
2090            let mut window: Vec<Packet> = Vec::new();
2091
2092            for data in (1..13u8).collect::<Vec<u8>>()[..].chunks(3) {
2093                let mut packet = Packet::new();
2094                packet.set_wnd_size(BUF_SIZE as u32);
2095                packet.set_type(PacketType::Data);
2096                packet.set_connection_id(client.sender_connection_id);
2097                packet.set_seq_nr(client.seq_nr);
2098                packet.set_ack_nr(client.ack_nr);
2099                packet.payload = data.to_vec();
2100                window.push(packet.clone());
2101                client.send_window.push(packet.clone());
2102                client.seq_nr += 1;
2103                client.curr_window += packet.len() as u32;
2104            }
2105
2106            let mut packet = Packet::new();
2107            packet.set_wnd_size(BUF_SIZE as u32);
2108            packet.set_type(PacketType::Fin);
2109            packet.set_connection_id(client.sender_connection_id);
2110            packet.set_seq_nr(client.seq_nr);
2111            packet.set_ack_nr(client.ack_nr);
2112            window.push(packet);
2113            client.seq_nr += 1;
2114
2115            iotry!(s.send_to(&window[3].to_bytes()[..], server_addr));
2116            iotry!(s.send_to(&window[2].to_bytes()[..], server_addr));
2117            iotry!(s.send_to(&window[1].to_bytes()[..], server_addr));
2118            iotry!(s.send_to(&window[0].to_bytes()[..], server_addr));
2119            iotry!(s.send_to(&window[4].to_bytes()[..], server_addr));
2120
2121            for _ in 0u8..2 {
2122                let mut buf = [0; BUF_SIZE];
2123                iotry!(s.recv_from(&mut buf));
2124            }
2125        });
2126
2127        let mut buf = [0; BUF_SIZE];
2128        let expected: Vec<u8> = (1..13u8).collect();
2129        let mut received: Vec<u8> = vec![];
2130        loop {
2131            match server.recv_from(&mut buf) {
2132                Ok((0, _src)) => break,
2133                Ok((len, _src)) => received.extend(buf[..len].to_vec()),
2134                Err(e) => panic!("{:?}", e),
2135            }
2136        }
2137
2138        // After establishing a new connection, the server's ids are a mirror of the client's.
2139        assert_eq!(server.receiver_connection_id,
2140                   server.sender_connection_id + 1);
2141        assert_eq!(server.state, SocketState::Closed);
2142        assert_eq!(received.len(), expected.len());
2143        assert_eq!(received, expected);
2144
2145        assert!(child.join().is_ok());
2146    }
2147
2148    #[test]
2149    fn test_response_to_triple_ack() {
2150        let server_addr = next_test_ip4();
2151        let mut server = iotry!(UtpSocket::bind(server_addr));
2152
2153        // Fits in a packet
2154        const LEN: usize = 1024;
2155        let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2156        let d = data.clone();
2157        assert_eq!(LEN, data.len());
2158
2159        let child = thread::spawn(move || {
2160            let mut client = iotry!(UtpSocket::connect(server_addr));
2161            iotry!(client.send_to(&d[..]));
2162            iotry!(client.close());
2163        });
2164
2165        let mut buf = [0; BUF_SIZE];
2166        // Expect SYN
2167        iotry!(server.recv(&mut buf, false));
2168
2169        // Receive data
2170        let data_packet = match server.socket.recv_from(&mut buf) {
2171            Ok((read, _src)) => iotry!(Packet::from_bytes(&buf[..read])),
2172            Err(e) => panic!("{}", e),
2173        };
2174        assert_eq!(data_packet.get_type(), PacketType::Data);
2175        assert_eq!(data_packet.payload, data);
2176        assert_eq!(data_packet.payload.len(), data.len());
2177
2178        // Send triple ACK
2179        let mut packet = Packet::new();
2180        packet.set_wnd_size(BUF_SIZE as u32);
2181        packet.set_type(PacketType::State);
2182        packet.set_seq_nr(server.seq_nr);
2183        packet.set_ack_nr(data_packet.seq_nr() - 1);
2184        packet.set_connection_id(server.sender_connection_id);
2185
2186        for _ in 0u8..3 {
2187            iotry!(server.socket.send_to(&packet.to_bytes()[..], server.connected_to));
2188        }
2189
2190        // Receive data again and check that it's the same we reported as missing
2191        let client_addr = server.connected_to;
2192        match server.socket.recv_from(&mut buf) {
2193            Ok((0, _)) => panic!("Received 0 bytes from socket"),
2194            Ok((read, _src)) => {
2195                let packet = iotry!(Packet::from_bytes(&buf[..read]));
2196                assert_eq!(packet.get_type(), PacketType::Data);
2197                assert_eq!(packet.seq_nr(), data_packet.seq_nr());
2198                assert!(packet.payload == data_packet.payload);
2199                let response = server.handle_packet(&packet, client_addr);
2200                assert!(response.is_ok());
2201                let response = response.unwrap();
2202                assert!(response.is_some());
2203                let response = response.unwrap();
2204                iotry!(server.socket.send_to(&response.to_bytes()[..], server.connected_to));
2205            }
2206            Err(e) => panic!("{}", e),
2207        }
2208
2209        // Receive close
2210        iotry!(server.recv_from(&mut buf));
2211
2212        assert!(child.join().is_ok());
2213    }
2214
2215    #[test]
2216    fn test_socket_timeout_request() {
2217        let (server_addr, client_addr) = (next_test_ip4()
2218                                              .to_socket_addrs()
2219                                              .unwrap()
2220                                              .next()
2221                                              .unwrap(),
2222                                          next_test_ip4()
2223                                              .to_socket_addrs()
2224                                              .unwrap()
2225                                              .next()
2226                                              .unwrap());
2227
2228        let client = iotry!(UtpSocket::bind(client_addr));
2229        let mut server = iotry!(UtpSocket::bind(server_addr));
2230        const LEN: usize = 512;
2231        let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2232        let d = data.clone();
2233
2234        assert!(server.state == SocketState::New);
2235        assert!(client.state == SocketState::New);
2236
2237        // Check proper difference in client's send connection id and receive connection id
2238        assert_eq!(client.sender_connection_id,
2239                   client.receiver_connection_id + 1);
2240
2241        let child = thread::spawn(move || {
2242            let mut client = iotry!(UtpSocket::connect(server_addr));
2243            assert!(client.state == SocketState::Connected);
2244            assert_eq!(client.connected_to, server_addr);
2245            iotry!(client.send_to(&d[..]));
2246            drop(client);
2247        });
2248
2249        let mut buf = [0u8; BUF_SIZE];
2250        server.recv(&mut buf, false).unwrap();
2251        // After establishing a new connection, the server's ids are a mirror of the client's.
2252        assert_eq!(server.receiver_connection_id,
2253                   server.sender_connection_id + 1);
2254
2255        assert!(server.state == SocketState::Connected);
2256
2257        // Purposefully read from UDP socket directly and discard it, in order
2258        // to behave as if the packet was lost and thus trigger the timeout
2259        // handling in the *next* call to `UtpSocket.recv_from`.
2260        iotry!(server.socket.recv_from(&mut buf));
2261
2262        // Set a much smaller than usual timeout, for quicker test completion
2263        server.congestion_timeout = 50;
2264
2265        // Now wait for the previously discarded packet
2266        loop {
2267            match server.recv_from(&mut buf) {
2268                Ok((0, _)) => continue,
2269                Ok(_) => break,
2270                Err(e) => panic!("{}", e),
2271            }
2272        }
2273
2274        drop(server);
2275
2276        assert!(child.join().is_ok());
2277    }
2278
2279    #[test]
2280    fn test_sorted_buffer_insertion() {
2281        let server_addr = next_test_ip4();
2282        let mut socket = iotry!(UtpSocket::bind(server_addr));
2283
2284        let mut packet = Packet::new();
2285        packet.set_seq_nr(1);
2286
2287        assert!(socket.incoming_buffer.is_empty());
2288
2289        socket.insert_into_buffer(packet.clone());
2290        assert_eq!(socket.incoming_buffer.len(), 1);
2291
2292        packet.set_seq_nr(2);
2293        packet.set_timestamp_microseconds(128);
2294
2295        socket.insert_into_buffer(packet.clone());
2296        assert_eq!(socket.incoming_buffer.len(), 2);
2297        assert_eq!(socket.incoming_buffer[1].seq_nr(), 2);
2298        assert_eq!(socket.incoming_buffer[1].timestamp_microseconds(), 128);
2299
2300        packet.set_seq_nr(3);
2301        packet.set_timestamp_microseconds(256);
2302
2303        socket.insert_into_buffer(packet.clone());
2304        assert_eq!(socket.incoming_buffer.len(), 3);
2305        assert_eq!(socket.incoming_buffer[2].seq_nr(), 3);
2306        assert_eq!(socket.incoming_buffer[2].timestamp_microseconds(), 256);
2307
2308        // Replacing a packet with a more recent version doesn't work
2309        packet.set_seq_nr(2);
2310        packet.set_timestamp_microseconds(456);
2311
2312        socket.insert_into_buffer(packet.clone());
2313        assert_eq!(socket.incoming_buffer.len(), 3);
2314        assert_eq!(socket.incoming_buffer[1].seq_nr(), 2);
2315        assert_eq!(socket.incoming_buffer[1].timestamp_microseconds(), 128);
2316    }
2317
2318    #[test]
2319    fn test_duplicate_packet_handling() {
2320        let (server_addr, client_addr) = (next_test_ip4(), next_test_ip4());
2321
2322        let client = iotry!(UtpSocket::bind(client_addr));
2323        let mut server = iotry!(UtpSocket::bind(server_addr));
2324
2325        assert!(server.state == SocketState::New);
2326        assert!(client.state == SocketState::New);
2327
2328        // Check proper difference in client's send connection id and receive connection id
2329        assert_eq!(client.sender_connection_id,
2330                   client.receiver_connection_id + 1);
2331
2332        let child = thread::spawn(move || {
2333            let mut client = iotry!(UtpSocket::connect(server_addr));
2334            assert!(client.state == SocketState::Connected);
2335
2336            let mut packet = Packet::new();
2337            packet.set_wnd_size(BUF_SIZE as u32);
2338            packet.set_type(PacketType::Data);
2339            packet.set_connection_id(client.sender_connection_id);
2340            packet.set_seq_nr(client.seq_nr);
2341            packet.set_ack_nr(client.ack_nr);
2342            packet.payload = vec![1, 2, 3];
2343
2344            // Send two copies of the packet, with different timestamps
2345            for _ in 0u8..2 {
2346                packet.set_timestamp_microseconds(now_microseconds());
2347                iotry!(client.socket.send_to(&packet.to_bytes()[..], server_addr));
2348            }
2349            client.seq_nr += 1;
2350
2351            // Receive one ACK
2352            for _ in 0u8..1 {
2353                let mut buf = [0; BUF_SIZE];
2354                iotry!(client.socket.recv_from(&mut buf));
2355            }
2356
2357            iotry!(client.close());
2358        });
2359
2360        let mut buf = [0u8; BUF_SIZE];
2361        iotry!(server.recv(&mut buf, false));
2362        // After establishing a new connection, the server's ids are a mirror of the client's.
2363        assert_eq!(server.receiver_connection_id,
2364                   server.sender_connection_id + 1);
2365
2366        assert!(server.state == SocketState::Connected);
2367
2368        let expected: Vec<u8> = vec![1, 2, 3];
2369        let mut received: Vec<u8> = vec![];
2370        loop {
2371            match server.recv_from(&mut buf) {
2372                Ok((0, _src)) => break,
2373                Ok((len, _src)) => received.extend(buf[..len].to_vec()),
2374                Err(e) => panic!("{:?}", e),
2375            }
2376        }
2377        assert_eq!(received.len(), expected.len());
2378        assert_eq!(received, expected);
2379
2380        assert!(child.join().is_ok());
2381    }
2382
2383    // #[test]
2384    // #[ignore]
2385    // fn test_selective_ack_response() {
2386    //     let (server_addr, client_addr) = (next_test_ip4(), next_test_ip4());
2387    //     const LEN: usize = 1024 * 10;
2388    //     let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2389    //     let to_send = data.clone();
2390
2391    //     // Client
2392    //     thread::spawn(move || {
2393    //         let client = iotry!(UtpSocket::bind(client_addr));
2394    //         let mut client = iotry!(UtpSocket::connect(server_addr));
2395    //         client.congestion_timeout = 50;
2396
2397    //         iotry!(client.send_to(&to_send[..]));
2398    //         iotry!(client.close());
2399    //     });
2400
2401    //     // Server
2402    //     let mut server = iotry!(UtpSocket::bind(server_addr));
2403
2404    // let mut buf = [0; BUF_SIZE];
2405
2406    //     // Connect
2407    //     iotry!(server.recv_from(&mut buf));
2408
2409    //     // Discard packets
2410    //     iotry!(server.socket.recv_from(&mut buf));
2411    //     iotry!(server.socket.recv_from(&mut buf));
2412    //     iotry!(server.socket.recv_from(&mut buf));
2413
2414    //     // Generate SACK
2415    //     let mut packet = Packet::new();
2416    //     packet.set_seq_nr(server.seq_nr);
2417    //     packet.set_ack_nr(server.ack_nr - 1);
2418    //     packet.set_connection_id(server.sender_connection_id);
2419    //     packet.set_timestamp_microseconds(now_microseconds());
2420    //     packet.set_type(PacketType::State);
2421    //     packet.set_sack(vec!(12, 0, 0, 0));
2422
2423    //     // Send SACK
2424    //     iotry!(server.socket.send_to(&packet.to_bytes()[..], server.connected_to.clone()));
2425
2426    //     // Expect to receive "missing" packets
2427    //     let mut received: Vec<u8> = vec!();
2428    //     loop {
2429    //         match server.recv_from(&mut buf) {
2430    //             Ok((0, _src)) => break,
2431    //             Ok((len, _src)) => received.extend(buf[..len].to_vec()),
2432    //             Err(e) => panic!("{:?}", e)
2433    //         }
2434    //     }
2435    //     assert!(!received.is_empty());
2436    //     assert_eq!(received.len(), data.len());
2437    //     assert_eq!(received, data);
2438    // }
2439
2440    #[test]
2441    fn test_correct_packet_loss() {
2442        let server_addr = next_test_ip4();
2443
2444        let mut server = iotry!(UtpSocket::bind(server_addr));
2445        const LEN: usize = 1024 * 10;
2446        let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2447        let to_send = data.clone();
2448
2449        let child = thread::spawn(move || {
2450            let mut client = iotry!(UtpSocket::connect(server_addr));
2451
2452            // Send everything except the odd chunks
2453            let chunks = to_send[..].chunks(BUF_SIZE);
2454            let dst = client.connected_to;
2455            for (index, chunk) in chunks.enumerate() {
2456                let mut packet = Packet::new();
2457                packet.set_seq_nr(client.seq_nr);
2458                packet.set_ack_nr(client.ack_nr);
2459                packet.set_connection_id(client.sender_connection_id);
2460                packet.set_timestamp_microseconds(now_microseconds());
2461                packet.payload = chunk.to_vec();
2462                packet.set_type(PacketType::Data);
2463
2464                if index % 2 == 0 {
2465                    iotry!(client.socket.send_to(&packet.to_bytes()[..], dst));
2466                }
2467
2468                client.curr_window += packet.len() as u32;
2469                client.send_window.push(packet);
2470                client.seq_nr += 1;
2471            }
2472
2473            iotry!(client.close());
2474        });
2475
2476        let mut buf = [0; BUF_SIZE];
2477        let mut received: Vec<u8> = vec![];
2478        loop {
2479            match server.recv_from(&mut buf) {
2480                Ok((0, _src)) => break,
2481                Ok((len, _src)) => received.extend(buf[..len].to_vec()),
2482                Err(e) => panic!("{}", e),
2483            }
2484        }
2485        assert_eq!(received.len(), data.len());
2486        assert_eq!(received, data);
2487
2488        assert!(child.join().is_ok());
2489    }
2490
2491    #[test]
2492    fn test_tolerance_to_small_buffers() {
2493        let server_addr = next_test_ip4();
2494        let mut server = iotry!(UtpSocket::bind(server_addr));
2495        const LEN: usize = 1024;
2496        let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2497        let to_send = data.clone();
2498
2499        let child = thread::spawn(move || {
2500            let mut client = iotry!(UtpSocket::connect(server_addr));
2501            iotry!(client.send_to(&to_send[..]));
2502            iotry!(client.close());
2503        });
2504
2505        let mut read = Vec::new();
2506        while server.state != SocketState::Closed {
2507            let mut small_buffer = [0; 512];
2508            match server.recv_from(&mut small_buffer) {
2509                Ok((0, _src)) => break,
2510                Ok((len, _src)) => read.extend(small_buffer[..len].to_vec()),
2511                Err(e) => panic!("{}", e),
2512            }
2513        }
2514
2515        assert_eq!(read.len(), data.len());
2516        assert_eq!(read, data);
2517
2518        assert!(child.join().is_ok());
2519    }
2520
2521    #[test]
2522    fn test_sequence_number_rollover() {
2523        let (server_addr, client_addr) = (next_test_ip4(), next_test_ip4());
2524
2525        let mut server = iotry!(UtpSocket::bind(server_addr));
2526
2527        const LEN: usize = BUF_SIZE * 4;
2528        let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2529        let to_send = data.clone();
2530
2531        let child = thread::spawn(move || {
2532            let mut client = iotry!(UtpSocket::bind(client_addr));
2533
2534            // Advance socket's sequence number
2535            client.seq_nr = ::std::u16::MAX - (to_send.len() / (BUF_SIZE * 2)) as u16;
2536
2537            let mut client = iotry!(UtpSocket::connect(server_addr));
2538            // Send enough data to rollover
2539            iotry!(client.send_to(&to_send[..]));
2540            // Check that the sequence number did rollover
2541            assert!(client.seq_nr < 50);
2542            // Close connection
2543            iotry!(client.close());
2544        });
2545
2546        let mut buf = [0; BUF_SIZE];
2547        let mut received: Vec<u8> = vec![];
2548        loop {
2549            match server.recv_from(&mut buf) {
2550                Ok((0, _src)) => break,
2551                Ok((len, _src)) => received.extend(buf[..len].to_vec()),
2552                Err(e) => panic!("{}", e),
2553            }
2554        }
2555        assert_eq!(received.len(), data.len());
2556        assert_eq!(received, data);
2557
2558        assert!(child.join().is_ok());
2559    }
2560
2561    #[test]
2562    fn test_drop_unused_socket() {
2563        let server_addr = next_test_ip4();
2564        let server = iotry!(UtpSocket::bind(server_addr));
2565
2566        // Explicitly dropping socket. This test should not hang.
2567        drop(server);
2568    }
2569
2570    #[test]
2571    fn test_invalid_packet_on_connect() {
2572        use std::net::UdpSocket;
2573        let server_addr = next_test_ip4();
2574        let server = iotry!(UdpSocket::bind(server_addr));
2575
2576        let child = thread::spawn(move || {
2577            let mut buf = [0; BUF_SIZE];
2578            match server.recv_from(&mut buf) {
2579                Ok((_len, client_addr)) => {
2580                    iotry!(server.send_to(&[], client_addr));
2581                }
2582                _ => panic!(),
2583            }
2584        });
2585
2586        match UtpSocket::connect(server_addr) {
2587            Err(ref e) if e.kind() == ErrorKind::Other => (), // OK
2588            Err(e) => panic!("Expected ErrorKind::Other, got {:?}", e),
2589            Ok(_) => panic!("Expected Err, got Ok"),
2590        }
2591
2592        assert!(child.join().is_ok());
2593    }
2594
2595    #[test]
2596    fn test_receive_unexpected_reply_type_on_connect() {
2597        use std::net::UdpSocket;
2598        let server_addr = next_test_ip4();
2599        let server = iotry!(UdpSocket::bind(server_addr));
2600
2601        let child = thread::spawn(move || {
2602            let mut buf = [0; BUF_SIZE];
2603            let mut packet = Packet::new();
2604            packet.set_type(PacketType::Data);
2605
2606            match server.recv_from(&mut buf) {
2607                Ok((_len, client_addr)) => {
2608                    iotry!(server.send_to(&packet.to_bytes()[..], client_addr));
2609                }
2610                _ => panic!(),
2611            }
2612        });
2613
2614        match UtpSocket::connect(server_addr) {
2615            Err(ref e) if e.kind() == ErrorKind::TimedOut => (), // OK
2616            Err(e) => panic!("Expected ErrorKind::TimedOut, got {:?}", e),
2617            Ok(_) => panic!("Expected Err, got Ok"),
2618        }
2619
2620        assert!(child.join().is_ok());
2621    }
2622
2623    #[test]
2624    fn test_receiving_syn_on_established_connection() {
2625        // Establish connection
2626        let server_addr = next_test_ip4();
2627        let mut server = iotry!(UtpSocket::bind(server_addr));
2628
2629        let child = thread::spawn(move || {
2630            let mut buf = [0; BUF_SIZE];
2631            loop {
2632                match server.recv_from(&mut buf) {
2633                    Ok((0, _src)) => break,
2634                    Ok(_) => (),
2635                    Err(e) => panic!("{:?}", e),
2636                }
2637            }
2638        });
2639
2640        let mut client = iotry!(UtpSocket::connect(server_addr));
2641        let mut packet = Packet::new();
2642        packet.set_wnd_size(BUF_SIZE as u32);
2643        packet.set_type(PacketType::Syn);
2644        packet.set_connection_id(client.sender_connection_id);
2645        packet.set_seq_nr(client.seq_nr);
2646        packet.set_ack_nr(client.ack_nr);
2647
2648        let other_socket = iotry!(::std::net::UdpSocket::bind("0.0.0.0:0"));
2649
2650        iotry!(other_socket.send_to(&packet.to_bytes()[..], server_addr));
2651
2652        let mut buf = [0; BUF_SIZE];
2653        match other_socket.recv_from(&mut buf) {
2654            Ok((len, _src)) => {
2655                let reply = Packet::from_bytes(&buf[..len]).ok().unwrap();
2656                assert_eq!(reply.get_type(), PacketType::Reset);
2657            }
2658            Err(e) => panic!("{:?}", e),
2659        }
2660        iotry!(client.close());
2661
2662        assert!(child.join().is_ok());
2663    }
2664
2665    #[test]
2666    fn test_receiving_reset_on_established_connection() {
2667        // Establish connection
2668        let server_addr = next_test_ip4();
2669        let mut server = iotry!(UtpSocket::bind(server_addr));
2670
2671        let child = thread::spawn(move || {
2672            let client = iotry!(UtpSocket::connect(server_addr));
2673            let mut packet = Packet::new();
2674            packet.set_wnd_size(BUF_SIZE as u32);
2675            packet.set_type(PacketType::Reset);
2676            packet.set_connection_id(client.sender_connection_id);
2677            packet.set_seq_nr(client.seq_nr);
2678            packet.set_ack_nr(client.ack_nr);
2679            iotry!(client.socket.send_to(&packet.to_bytes()[..], server_addr));
2680            let mut buf = [0; BUF_SIZE];
2681            match client.socket.recv_from(&mut buf) {
2682                Ok((_len, _src)) => (),
2683                Err(e) => panic!("{:?}", e),
2684            }
2685        });
2686
2687        let mut buf = [0; BUF_SIZE];
2688        loop {
2689            match server.recv_from(&mut buf) {
2690                Ok((0, _src)) => break,
2691                Ok(_) => (),
2692                Err(ref e) if e.kind() == ErrorKind::ConnectionReset => return,
2693                Err(e) => panic!("{:?}", e),
2694            }
2695        }
2696        assert!(child.join().is_ok());
2697        panic!("Should have received Reset");
2698    }
2699
2700    #[test]
2701    fn test_premature_fin() {
2702        let (server_addr, client_addr) = (next_test_ip4(), next_test_ip4());
2703        let mut server = iotry!(UtpSocket::bind(server_addr));
2704
2705        const LEN: usize = BUF_SIZE * 4;
2706        let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
2707        let to_send = data.clone();
2708
2709        let child = thread::spawn(move || {
2710            let mut client = iotry!(UtpSocket::connect(server_addr));
2711            iotry!(client.send_to(&to_send[..]));
2712            iotry!(client.close());
2713        });
2714
2715        let mut buf = [0; BUF_SIZE];
2716
2717        // Accept connection
2718        iotry!(server.recv(&mut buf, false));
2719
2720        // Send FIN without acknowledging packets received
2721        let mut packet = Packet::new();
2722        packet.set_connection_id(server.sender_connection_id);
2723        packet.set_seq_nr(server.seq_nr);
2724        packet.set_ack_nr(server.ack_nr);
2725        packet.set_timestamp_microseconds(now_microseconds());
2726        packet.set_type(PacketType::Fin);
2727        iotry!(server.socket.send_to(&packet.to_bytes()[..], client_addr));
2728
2729        // Receive until end
2730        let mut received: Vec<u8> = vec![];
2731        loop {
2732            match server.recv_from(&mut buf) {
2733                Ok((0, _src)) => break,
2734                Ok((len, _src)) => received.extend(buf[..len].to_vec()),
2735                Err(e) => panic!("{}", e),
2736            }
2737        }
2738        assert_eq!(received.len(), data.len());
2739        assert_eq!(received, data);
2740
2741        assert!(child.join().is_ok());
2742    }
2743
2744    #[test]
2745    fn test_base_delay_calculation() {
2746        let minute_in_microseconds = 60 * 10i64.pow(6);
2747        let samples = vec![(0, 10),
2748                           (1, 8),
2749                           (2, 12),
2750                           (3, 7),
2751                           (minute_in_microseconds + 1, 11),
2752                           (minute_in_microseconds + 2, 19),
2753                           (minute_in_microseconds + 3, 9)];
2754        let addr = next_test_ip4();
2755        let mut socket = UtpSocket::bind(addr).unwrap();
2756
2757        for (timestamp, delay) in samples {
2758            socket.update_base_delay(delay, timestamp + delay);
2759        }
2760
2761        let expected = vec![7, 9];
2762        let actual = socket.base_delays.iter().map(|&x| x).collect::<Vec<_>>();
2763        assert_eq!(expected, actual);
2764        assert_eq!(socket.min_base_delay(), 7);
2765    }
2766
2767    #[test]
2768    fn test_local_addr() {
2769        let addr = next_test_ip4();
2770        let addr = addr.to_socket_addrs().unwrap().next().unwrap();
2771        let socket = UtpSocket::bind(addr).unwrap();
2772
2773        assert!(socket.local_addr().is_ok());
2774        assert_eq!(socket.local_addr().unwrap(), addr);
2775    }
2776
2777    #[test]
2778    fn test_listener_local_addr() {
2779        let addr = next_test_ip4();
2780        let addr = addr.to_socket_addrs().unwrap().next().unwrap();
2781        let listener = UtpListener::bind(addr).unwrap();
2782
2783        assert!(listener.local_addr().is_ok());
2784        assert_eq!(listener.local_addr().unwrap(), addr);
2785    }
2786
2787    #[test]
2788    fn test_peer_addr() {
2789        use std::sync::mpsc::channel;
2790        let addr = next_test_ip4();
2791        let server_addr = addr.to_socket_addrs().unwrap().next().unwrap();
2792        let mut server = UtpSocket::bind(server_addr).unwrap();
2793        let (tx, rx) = channel();
2794
2795        // `peer_addr` should return an error because the socket isn't connected yet
2796        assert!(server.peer_addr().is_err());
2797
2798        let child = thread::spawn(move || {
2799            let mut client = iotry!(UtpSocket::connect(server_addr));
2800            let mut buf = [0; 1024];
2801            iotry!(tx.send(client.local_addr()));
2802            iotry!(client.recv_from(&mut buf));
2803        });
2804
2805        // Wait for a connection to be established
2806        let mut buf = [0; 1024];
2807        iotry!(server.recv(&mut buf, false));
2808
2809        // `peer_addr` should succeed and be equal to the client's address
2810        assert!(server.peer_addr().is_ok());
2811        // The client is expected to be bound to "0.0.0.0", so we can only check if the port is
2812        // correct
2813        let client_addr = rx.recv().unwrap().unwrap();
2814        assert_eq!(server.peer_addr().unwrap().port(), client_addr.port());
2815
2816        // Close the connection
2817        iotry!(server.close());
2818
2819        // `peer_addr` should now return an error because the socket is closed
2820        assert!(server.peer_addr().is_err());
2821
2822        assert!(child.join().is_ok());
2823    }
2824
2825    #[test]
2826    fn test_take_address() {
2827        // Expected successes
2828        assert!(take_address(("0.0.0.0:0")).is_ok());
2829        assert!(take_address((":::0")).is_ok());
2830        assert!(take_address(("0.0.0.0", 0)).is_ok());
2831        assert!(take_address(("::", 0)).is_ok());
2832        assert!(take_address(("1.2.3.4", 5)).is_ok());
2833
2834        // Expected failures
2835        assert!(take_address("999.0.0.0:0").is_err());
2836        assert!(take_address(("1.2.3.4:70000")).is_err());
2837        assert!(take_address("").is_err());
2838        assert!(take_address("this is not an address").is_err());
2839        assert!(take_address("no.dns.resolution.com").is_err());
2840    }
2841
2842    // Test reaction to connection loss when sending data packets
2843    #[test]
2844    fn test_connection_loss_data() {
2845        let server_addr = next_test_ip4();
2846        let mut server = iotry!(UtpSocket::bind(server_addr));
2847        // Decrease timeouts for faster tests
2848        server.congestion_timeout = 1;
2849        let attempts = server.max_retransmission_retries;
2850
2851        let child = thread::spawn(move || {
2852            let mut client = iotry!(UtpSocket::connect(server_addr));
2853            iotry!(client.send_to(&[0]));
2854            // Simulate connection loss by killing the socket.
2855            client.state = SocketState::Closed;
2856            let socket = client.socket.try_clone().unwrap();
2857            let mut buf = [0; BUF_SIZE];
2858            iotry!(socket.recv_from(&mut buf));
2859            for _ in 0..attempts {
2860                match socket.recv_from(&mut buf) {
2861                    Ok((len, _src)) => {
2862                        assert_eq!(Packet::from_bytes(&buf[..len]).unwrap().get_type(),
2863                                   PacketType::Data)
2864                    }
2865                    Err(e) => panic!("{}", e),
2866                }
2867            }
2868        });
2869
2870        // Drain incoming packets
2871        let mut buf = [0; BUF_SIZE];
2872        iotry!(server.recv_from(&mut buf));
2873
2874        iotry!(server.send_to(&[0]));
2875
2876        // Try to receive ACKs, time out too many times on flush, and fail with `TimedOut`
2877        let mut buf = [0; BUF_SIZE];
2878        match server.recv(&mut buf, false) {
2879            Err(ref e) if e.kind() == ErrorKind::TimedOut => (),
2880            x => panic!("Expected Err(TimedOut), got {:?}", x),
2881        }
2882
2883        assert!(child.join().is_ok());
2884    }
2885
2886    // Test reaction to connection loss when sending FIN
2887    #[test]
2888    fn test_connection_loss_fin() {
2889        let server_addr = next_test_ip4();
2890        let mut server = iotry!(UtpSocket::bind(server_addr));
2891        // Decrease timeouts for faster tests
2892        server.congestion_timeout = 1;
2893        let attempts = server.max_retransmission_retries;
2894
2895        let child = thread::spawn(move || {
2896            let mut client = iotry!(UtpSocket::connect(server_addr));
2897            iotry!(client.send_to(&[0]));
2898            // Simulate connection loss by killing the socket.
2899            client.state = SocketState::Closed;
2900            let socket = client.socket.try_clone().unwrap();
2901            let mut buf = [0; BUF_SIZE];
2902            iotry!(socket.recv_from(&mut buf));
2903            for _ in 0..attempts {
2904                match socket.recv_from(&mut buf) {
2905                    Ok((len, _src)) => {
2906                        assert_eq!(Packet::from_bytes(&buf[..len]).unwrap().get_type(),
2907                                   PacketType::Fin)
2908                    }
2909                    Err(e) => panic!("{}", e),
2910                }
2911            }
2912        });
2913
2914        // Drain incoming packets
2915        let mut buf = [0; BUF_SIZE];
2916        iotry!(server.recv_from(&mut buf));
2917
2918        // Send FIN, time out too many times, and fail with `TimedOut`
2919        match server.close() {
2920            Err(ref e) if e.kind() == ErrorKind::TimedOut => (),
2921            x => panic!("Expected Err(TimedOut), got {:?}", x),
2922        }
2923        assert!(child.join().is_ok());
2924    }
2925
2926    // Test reaction to connection loss when waiting for data packets
2927    #[test]
2928    fn test_connection_loss_waiting() {
2929        let server_addr = next_test_ip4();
2930        let mut server = iotry!(UtpSocket::bind(server_addr));
2931        // Decrease timeouts for faster tests
2932        server.congestion_timeout = 1;
2933        let attempts = server.max_retransmission_retries;
2934
2935        let child = thread::spawn(move || {
2936            let mut client = iotry!(UtpSocket::connect(server_addr));
2937            iotry!(client.send_to(&[0]));
2938            // Simulate connection loss by killing the socket.
2939            client.state = SocketState::Closed;
2940            let socket = client.socket.try_clone().unwrap();
2941            let seq_nr = client.seq_nr;
2942            let mut buf = [0; BUF_SIZE];
2943            for _ in 0..(3 * attempts) {
2944                match socket.recv_from(&mut buf) {
2945                    Ok((len, _src)) => {
2946                        let packet = iotry!(Packet::from_bytes(&buf[..len]));
2947                        assert_eq!(packet.get_type(), PacketType::State);
2948                        assert_eq!(packet.ack_nr(), seq_nr - 1);
2949                    }
2950                    Err(e) => panic!("{}", e),
2951                }
2952            }
2953        });
2954
2955        // Drain incoming packets
2956        let mut buf = [0; BUF_SIZE];
2957        iotry!(server.recv_from(&mut buf));
2958
2959        // Try to receive data, time out too many times, and fail with `TimedOut`
2960        let mut buf = [0; BUF_SIZE];
2961        match server.recv_from(&mut buf) {
2962            Err(ref e) if e.kind() == ErrorKind::TimedOut => (),
2963            x => panic!("Expected Err(TimedOut), got {:?}", x),
2964        }
2965        assert!(child.join().is_ok());
2966    }
2967
2968    const NETWORK_NODE_COUNT: usize = 20;
2969    const NETWORK_MSG_COUNT: usize = 5;
2970
2971    fn test_network(exchange: fn(&mut UtpSocket) -> ()) {
2972        use std::net::SocketAddr;
2973        use std::thread::{JoinHandle, spawn};
2974
2975        const NODE_COUNT: usize = NETWORK_NODE_COUNT;
2976
2977        struct Node {
2978            listener: UtpListener,
2979        }
2980
2981        impl Node {
2982            fn new() -> Node {
2983                Node { listener: iotry!(UtpListener::bind("127.0.0.1:0")) }
2984            }
2985
2986            fn run(&mut self, exchange: fn(&mut UtpSocket) -> (), peer_addrs: Vec<SocketAddr>) {
2987                let connect_cnt = peer_addrs.len();
2988
2989                let connect_join_handle = spawn(move || {
2990                    let mut send_jhs = Vec::<JoinHandle<()>>::new();
2991
2992                    for peer_addr in peer_addrs {
2993                        send_jhs.push(spawn(move || {
2994                            let mut socket = iotry!(UtpSocket::connect(peer_addr));
2995                            exchange(&mut socket);
2996                        }));
2997                    }
2998
2999                    for jh in send_jhs {
3000                        iotry!(jh.join());
3001                    }
3002                });
3003
3004                let mut recv_jhs = Vec::<JoinHandle<()>>::new();
3005
3006                for _ in 0..NODE_COUNT-1-connect_cnt {
3007                    let mut socket = iotry!(self.listener.accept()).0;
3008                    recv_jhs.push(spawn(move || {
3009                        exchange(&mut socket);
3010                    }));
3011                }
3012
3013                for jh in recv_jhs {
3014                    iotry!(jh.join());
3015                }
3016
3017                iotry!(connect_join_handle.join());
3018            }
3019        }
3020
3021        let mut nodes = Vec::<Node>::new();
3022
3023        for _ in 0..NODE_COUNT {
3024            nodes.push(Node::new());
3025        }
3026
3027        let listening_addrs = nodes.iter()
3028                                   .map(|n| iotry!(n.listener.local_addr()))
3029                                   .collect::<Vec<_>>();
3030
3031        let mut join_handles = Vec::<JoinHandle<()>>::new();
3032
3033        let mut ni: usize = 0;
3034        for mut node in nodes {
3035            let mut addrs = Vec::<SocketAddr>::new();
3036
3037            for ai in 0..listening_addrs.len() {
3038                if ai <= ni { continue }
3039                addrs.push(listening_addrs[ai].clone());
3040            }
3041
3042            join_handles.push(spawn(move || {
3043                node.run(exchange, addrs);
3044            }));
3045
3046            ni += 1;
3047        }
3048
3049        for handle in join_handles {
3050            iotry!(handle.join());
3051        }
3052    }
3053
3054    #[test]
3055    fn test_network_no_timeout() {
3056        static MSG_COUNT: usize  = NETWORK_MSG_COUNT;
3057
3058        fn make_buf(i: usize) -> [u8; 10] {
3059            let mut buf = [0; 10];
3060            for j in 0..10 {
3061                buf[j] = (i + j) as u8;
3062            }
3063            buf
3064        }
3065
3066        fn sequential_exchange(socket: &mut UtpSocket) {
3067            let mut i = 0;
3068            let from = socket.socket.local_addr().map(|addr| addr.port()).unwrap_or(0);
3069            let to   = socket.connected_to.port();
3070
3071            while i < MSG_COUNT {
3072                let tx_buf = make_buf(i);
3073                assert_eq!(iotry!(socket.send_to(&tx_buf)), tx_buf.len());
3074                let mut buf = [0; 10];
3075
3076                match socket.recv_from(&mut buf) {
3077                    Ok((cnt, _)) => {
3078                        if cnt == 0 {
3079                            if socket.state != SocketState::Connected {
3080                                panic!("socket is in an invalid state \"{:?}\" from {:?} to {:?}",
3081                                         socket.state, from, to);
3082                            }
3083                        }
3084                        assert_eq!(cnt, 10);
3085                        if buf != make_buf(i) {
3086                            panic!("expected {:?} but received {:?} in recv step {}",
3087                                   make_buf(i),
3088                                   buf,
3089                                   i);
3090                        }
3091                    },
3092                    Err(err) => {
3093                        panic!("Recv error {:?}; from {:?} to {:?}", err, from, to);
3094                    }
3095                }
3096                i += 1;
3097            }
3098        }
3099
3100        for i in 0..100 {
3101            println!("------ Testing Network iteration {}", i);
3102            test_network(sequential_exchange);
3103        }
3104    }
3105
3106    #[test]
3107    fn test_network_with_timeout() {
3108        static MSG_COUNT: usize  = NETWORK_MSG_COUNT;
3109
3110        fn make_buf(i: usize) -> [u8; 10] {
3111            let mut buf = [0; 10];
3112            for j in 0..10 {
3113                buf[j] = (i + j) as u8;
3114            }
3115            buf
3116        }
3117
3118        fn timeout_exchange(socket: &mut UtpSocket) {
3119            socket.set_read_timeout(Some(50));
3120            let mut recv_cnt = 0;
3121            let mut send_cnt = 0;
3122
3123            let from = socket.socket.local_addr().map(|addr| addr.port()).unwrap_or(0);
3124            let to   = socket.connected_to.port();
3125
3126            loop {
3127                if send_cnt < MSG_COUNT {
3128                    let tx_buf = make_buf(send_cnt);
3129
3130                    match socket.send_to(&tx_buf) {
3131                        Ok(cnt) => {
3132                            assert_eq!(cnt, tx_buf.len());
3133                            send_cnt += 1;
3134                        }
3135                        Err(ref e) if e.kind() == ErrorKind::TimedOut => {}
3136                        Err(e) => {
3137                            panic!("{:?}", e);
3138                        }
3139                    }
3140                }
3141                if recv_cnt < MSG_COUNT {
3142                    let exp_buf = make_buf(recv_cnt);
3143
3144                    let mut buf = [0; 10];
3145                    match socket.recv_from(&mut buf) {
3146                        Ok((cnt, _)) => {
3147                            if cnt == 0 {
3148                                if socket.state != SocketState::Connected {
3149                                    panic!("socket is in an invalid state \"{:?}\" \
3150                                           from {:?} to {:?} in receive #{}",
3151                                             socket.state, from, to, recv_cnt);
3152                                }
3153                            } else {
3154                                assert_eq!(cnt, exp_buf.len());
3155                                assert_eq!(buf, exp_buf);
3156                                recv_cnt += 1;
3157                            }
3158                        },
3159                        Err(ref e) if e.kind() == ErrorKind::TimedOut => {
3160                        },
3161                        Err(e) => {
3162                            panic!("{:?} recv_cnt={} send_cnt={}", e, recv_cnt, send_cnt);
3163                        }
3164                    }
3165                }
3166
3167                if send_cnt == MSG_COUNT && recv_cnt == MSG_COUNT {
3168                    break;
3169                }
3170            }
3171        }
3172
3173
3174        for i in 0..100 {
3175            println!("------ Testing Network iteration {}", i);
3176            test_network(timeout_exchange);
3177        }
3178    }
3179
3180    #[test]
3181    fn test_send_client_to_server() {
3182        let listener = iotry!(UtpListener::bind("127.0.0.1:0"));
3183        let server_addr = iotry!(listener.local_addr());
3184
3185        static TX_BUF: [u8; 10] = [0,1,2,3,4,5,6,7,8,9];
3186
3187        let client_t = thread::spawn(move || {
3188            let mut client = iotry!(UtpSocket::connect(server_addr));
3189            assert_eq!(iotry!(client.send_to(&TX_BUF)), TX_BUF.len());
3190        });
3191
3192        let mut server = iotry!(listener.accept()).0;
3193
3194        let mut buf = [0; 10];
3195        iotry!(server.recv_from(&mut buf));
3196        assert_eq!(buf, TX_BUF);
3197
3198        assert!(client_t.join().is_ok());
3199    }
3200
3201    // Test data exchange
3202    #[test]
3203    fn test_send_server_to_client() {
3204        let listener = iotry!(UtpListener::bind("127.0.0.1:0"));
3205        let server_addr = iotry!(listener.local_addr());
3206
3207        static TX_BUF: [u8; 10] = [0,1,2,3,4,5,6,7,8,9];
3208
3209        let client_t = thread::spawn(move || {
3210            let mut client = iotry!(UtpSocket::connect(server_addr));
3211            let mut buf = [0; 10];
3212            iotry!(client.recv_from(&mut buf));
3213            assert_eq!(buf, TX_BUF);
3214        });
3215
3216        let mut server = iotry!(listener.accept()).0;
3217
3218        assert_eq!(iotry!(server.send_to(&TX_BUF)), TX_BUF.len());
3219        let fr = server.flush();
3220        assert!(fr.is_ok());
3221
3222        assert!(client_t.join().is_ok());
3223    }
3224
3225    // Test data exchange
3226    #[test]
3227    fn test_data_exchange_utp() {
3228        let listener = iotry!(UtpListener::bind("127.0.0.1:0"));
3229        let server_addr = iotry!(listener.local_addr());
3230
3231        static TX_BUF: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
3232
3233        let client_t = thread::spawn(move || {
3234            let mut client = iotry!(UtpSocket::connect(server_addr));
3235            assert_eq!(iotry!(client.send_to(&TX_BUF)), TX_BUF.len());
3236            let mut buf = [0; 10];
3237            iotry!(client.recv_from(&mut buf));
3238            assert_eq!(buf, TX_BUF);
3239        });
3240
3241        let mut server = iotry!(listener.accept()).0;
3242
3243        assert_eq!(iotry!(server.send_to(&TX_BUF)), TX_BUF.len());
3244        let mut buf = [0; 10];
3245        iotry!(server.recv_from(&mut buf));
3246        assert_eq!(buf, TX_BUF);
3247        let _ = server.flush();
3248
3249        assert!(client_t.join().is_ok());
3250    }
3251
3252    /// Analogous to the above, but with TCP sockets.
3253    #[test]
3254    fn test_data_exchange_tcp() {
3255        use std::net::{TcpListener, TcpStream};
3256        use std::io::{Read, Write};
3257
3258        static TX_BUF: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
3259
3260        let listener = iotry!(TcpListener::bind("127.0.0.1:0"));
3261        let server_addr = iotry!(listener.local_addr());
3262
3263        let client_t = thread::spawn(move || {
3264            let mut client = iotry!(TcpStream::connect(server_addr));
3265            assert_eq!(iotry!(client.write(&TX_BUF)), TX_BUF.len());
3266            let mut buf = [0; 10];
3267            iotry!(client.read(&mut buf));
3268            assert_eq!(buf, TX_BUF);
3269        });
3270
3271        let mut server = iotry!(listener.accept()).0;
3272
3273        assert_eq!(iotry!(server.write(&TX_BUF)), TX_BUF.len());
3274        let mut buf = [0; 10];
3275        iotry!(server.read(&mut buf));
3276        assert_eq!(buf, TX_BUF);
3277
3278        assert!(client_t.join().is_ok());
3279    }
3280
3281}