utp2/
socket.rs

1use {util, TIMESTAMP_MASK};
2use delays::Delays;
3use in_queue::InQueue;
4use out_queue::OutQueue;
5use packet::{self, Packet};
6
7use mio::net::UdpSocket;
8use mio::{Evented, Registration, SetReadiness, Ready, Poll, PollOpt, Token};
9
10use bytes::{BytesMut, BufMut};
11use slab::Slab;
12
13use std::{cmp, io, u32};
14use std::cell::RefCell;
15use std::rc::Rc;
16use std::net::SocketAddr;
17use std::collections::{HashMap, VecDeque};
18use std::time::{Duration, Instant};
19
20pub struct UtpSocket {
21    // Shared state
22    inner: InnerCell,
23}
24
25/// Manages the state for a single UTP connection
26pub struct UtpStream {
27    // Shared state
28    inner: InnerCell,
29
30    // Connection identifier
31    token: usize,
32
33    // Mio registration
34    registration: Registration,
35}
36
37pub struct UtpListener {
38    // Shared state
39    inner: InnerCell,
40
41    // Used to register interest in accepting UTP sockets
42    registration: Registration,
43}
44
45// Shared between the UtpSocket and each UtpStream
46struct Inner {
47    // State that needs to be passed to `Connection`. This is broken out to make
48    // the borrow checker happy.
49    shared: Shared,
50
51    // Connection specific state
52    connections: Slab<Connection>,
53
54    // Lookup a connection by key
55    connection_lookup: HashMap<Key, usize>,
56
57    // Buffer used for in-bound data
58    in_buf: BytesMut,
59
60    accept_buf: VecDeque<UtpStream>,
61
62    listener: SetReadiness,
63
64    listener_open: bool,
65}
66
67struct Shared {
68    // The UDP socket backing everything!
69    socket: UdpSocket,
70
71    // The current readiness of the socket, this is used when figuring out the
72    // readiness of each connection.
73    ready: Ready,
74
75    // Buffer used for out-bound data, this does not need to be share-able
76    out_buf: Vec<u8>,
77
78    // where to write the out_buf to
79    out_buf_dst: Option<SocketAddr>,
80}
81
82// Owned by UtpSocket
83#[derive(Debug)]
84struct Connection {
85    // Current socket state
86    state: State,
87
88    // A combination of the send ID and the socket address
89    key: Key,
90
91    // True when the `UtpStream` handle has been dropped
92    released: bool,
93
94    // Used to signal readiness on the `UtpStream`
95    set_readiness: SetReadiness,
96
97    // Queue of outbound packets. Packets will stay in the queue until the peer
98    // has acked them.
99    out_queue: OutQueue,
100
101    // Queue of inbound packets. The queue orders packets according to their
102    // sequence number.
103    in_queue: InQueue,
104
105    // Activity deadline
106    deadline: Option<Instant>,
107
108    // Tracks delays for the congestion control algorithm
109    our_delays: Delays,
110
111    their_delays: Delays,
112
113    last_maxed_out_window: Instant,
114    average_delay: i32,
115    current_delay_sum: i64,
116    current_delay_samples: i64,
117    average_delay_base: u32,
118    average_sample_time: Instant,
119    clock_drift: i32,
120    slow_start: bool,
121}
122
123#[derive(Debug, Clone, Eq, PartialEq, Hash)]
124struct Key {
125    receive_id: u16,
126    addr: SocketAddr,
127}
128
129#[derive(Debug, Clone, Eq, PartialEq)]
130enum State {
131    // Establishing a new connection, waiting for the peer to respond with a
132    // STATE.
133    SynSent,
134    // Received Syn, the state packet is sent immediately, but the connection is
135    // not transitioned to `Connected` until it has been accepted.
136    SynRecv,
137    // Fully connected state
138    Connected,
139    // A SYN has been sent and we are currently waiting for an ACK before
140    // closing the connection.
141    FinSent,
142    // The connection has been reset by the remote.
143    Reset,
144}
145
146type InnerCell = Rc<RefCell<Inner>>;
147
148const MIN_BUFFER_SIZE: usize = 4 * 1_024;
149const MAX_BUFFER_SIZE: usize = 64 * 1_024;
150const DEFAULT_IN_BUFFER_SIZE: usize = 64 * 1024;
151const DEFAULT_OUT_BUFFER_SIZE: usize = 4 * 1024;
152const MAX_CONNECTIONS_PER_SOCKET: usize = 2 * 1024;
153const DEFAULT_TIMEOUT_MS: u64 = 1_000;
154const TARGET_DELAY: u32 = 100_000; // 100ms in micros
155
156const SLOW_START_THRESHOLD: usize = DEFAULT_IN_BUFFER_SIZE;
157const MAX_CWND_INCREASE_BYTES_PER_RTT: usize = 3000;
158const MIN_WINDOW_SIZE: usize = 10;
159const MAX_DATA_SIZE: usize = 1_400 - 20;
160
161impl UtpSocket {
162    /// Bind a new `UtpSocket` to the given socket address
163    pub fn bind(addr: &SocketAddr) -> io::Result<(UtpSocket, UtpListener)> {
164        UdpSocket::bind(addr).map(UtpSocket::from_socket)
165    }
166
167    pub fn local_addr(&self) -> io::Result<SocketAddr> {
168        self.inner.borrow().shared.socket.local_addr()
169    }
170
171    /// Create a new `Utpsocket` backed by the provided `UdpSocket`.
172    pub fn from_socket(socket: UdpSocket) -> (UtpSocket, UtpListener) {
173        let (registration, set_readiness) = Registration::new2();
174
175        let inner = Rc::new(RefCell::new(Inner {
176            shared: Shared {
177                socket: socket,
178                ready: Ready::empty(),
179                out_buf: Vec::with_capacity(DEFAULT_OUT_BUFFER_SIZE),
180                out_buf_dst: None,
181            },
182            connections: Slab::new(),
183            connection_lookup: HashMap::new(),
184            in_buf: BytesMut::with_capacity(DEFAULT_IN_BUFFER_SIZE),
185            accept_buf: VecDeque::new(),
186            listener: set_readiness,
187            listener_open: true,
188        }));
189
190        let listener = UtpListener {
191            inner: inner.clone(),
192            registration: registration,
193        };
194
195        let socket = UtpSocket {
196            inner: inner,
197        };
198
199        (socket, listener)
200    }
201
202    /// Connect a new `UtpSocket` to the given remote socket address
203    pub fn connect(&self, addr: &SocketAddr) -> io::Result<UtpStream> {
204        self.inner.borrow_mut().connect(addr, &self.inner)
205    }
206
207    /// Called whenever the socket readiness changes
208    pub fn ready(&self, ready: Ready) -> io::Result<()> {
209        self.inner.borrow_mut().ready(ready, &self.inner)
210    }
211
212    /// This function should be called every 500ms
213    pub fn tick(&self) -> io::Result<()> {
214        self.inner.borrow_mut().tick()
215    }
216}
217
218impl Evented for UtpSocket {
219    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
220        -> io::Result<()>
221    {
222        self.inner.borrow_mut().shared.socket.register(poll, token, interest, opts)
223    }
224
225    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
226        -> io::Result<()>
227    {
228        self.inner.borrow_mut().shared.socket.reregister(poll, token, interest, opts)
229    }
230
231    fn deregister(&self, poll: &Poll) -> io::Result<()> {
232        self.inner.borrow_mut().shared.socket.deregister(poll)
233    }
234}
235
236impl UtpListener {
237    /// Receive a new inbound connection.
238    ///
239    /// This function will also advance the state of all associated connections.
240    pub fn accept(&self) -> io::Result<UtpStream> {
241        self.inner.borrow_mut().accept()
242    }
243}
244
245impl Drop for UtpListener {
246    fn drop(&mut self) {
247        let mut inner = self.inner.borrow_mut();
248        inner.listener_open = false;
249
250        // Empty the connection queue
251        while let Ok(_) = inner.accept() {}
252    }
253}
254
255#[cfg(test)]
256impl UtpListener {
257    pub fn is_readable(&self) -> bool {
258        let inner = self.inner.borrow();
259        inner.listener.readiness().is_readable()
260    }
261}
262
263impl Evented for UtpListener {
264    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
265        -> io::Result<()>
266    {
267        self.registration.register(poll, token, interest, opts)
268    }
269
270    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
271        -> io::Result<()>
272    {
273        self.registration.reregister(poll, token, interest, opts)
274    }
275
276    fn deregister(&self, poll: &Poll) -> io::Result<()> {
277        Evented::deregister(&self.registration, poll)
278    }
279}
280
281impl UtpStream {
282    pub fn local_addr(&self) -> io::Result<SocketAddr> {
283        let inner = self.inner.borrow();
284        inner.shared.socket.local_addr()
285    }
286
287    pub fn read(&self, dst: &mut [u8]) -> io::Result<usize> {
288        let mut inner = self.inner.borrow_mut();
289        let connection = &mut inner.connections[self.token];
290
291        match connection.in_queue.read(dst) {
292            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
293                if connection.state == State::Connected {
294                    try!(connection.update_readiness());
295                    Err(io::ErrorKind::WouldBlock.into())
296                } else if connection.state.is_closed() {
297                    if connection.state == State::Reset {
298                        Err(io::ErrorKind::ConnectionReset.into())
299                    } else {
300                        Ok(0)
301                    }
302                } else {
303                    unreachable!();
304                }
305            }
306            ret => {
307                connection.update_local_window();
308                ret
309            }
310        }
311    }
312
313    pub fn write(&self, src: &[u8]) -> io::Result<usize> {
314        self.inner.borrow_mut().write(self.token, src)
315    }
316}
317
318#[cfg(test)]
319impl UtpStream {
320    pub fn is_readable(&self) -> bool {
321        let inner = self.inner.borrow();
322        let connection = &inner.connections[self.token];
323        connection.set_readiness.readiness().is_readable()
324    }
325
326    pub fn is_writable(&self) -> bool {
327        let inner = self.inner.borrow();
328        let connection = &inner.connections[self.token];
329        connection.set_readiness.readiness().is_writable()
330    }
331}
332
333impl Drop for UtpStream {
334    fn drop(&mut self) {
335        self.inner.borrow_mut().close(self.token);
336    }
337}
338
339impl Evented for UtpStream {
340    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
341        -> io::Result<()>
342    {
343        self.registration.register(poll, token, interest, opts)
344    }
345
346    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
347        -> io::Result<()>
348    {
349        self.registration.reregister(poll, token, interest, opts)
350    }
351
352    fn deregister(&self, poll: &Poll) -> io::Result<()> {
353        Evented::deregister(&self.registration, poll)
354    }
355}
356
357impl Inner {
358    fn accept(&mut self) -> io::Result<UtpStream> {
359        match self.accept_buf.pop_front() {
360            Some(socket) => {
361                let conn = &mut self.connections[socket.token];
362
363                if conn.state == State::SynRecv {
364                    conn.state = State::Connected;
365                } else if conn.state.is_closed() {
366                    // Connection is being closed, but there may be data in the
367                    // buffer...
368                } else {
369                    unreachable!();
370                }
371
372                try!(conn.update_readiness());
373
374                Ok(socket)
375            }
376            None => {
377                // Unset readiness
378                try!(self.listener.set_readiness(Ready::empty()));
379
380                Err(io::ErrorKind::WouldBlock.into())
381            }
382        }
383    }
384
385    fn write(&mut self, token: usize, src: &[u8]) -> io::Result<usize> {
386        let conn = &mut self.connections[token];
387
388        if conn.state != State::Connected {
389            assert!(conn.state.is_closed(),
390                    "expected closed state; actual={:?}", conn.state);
391
392            return Err(io::ErrorKind::BrokenPipe.into());
393        }
394
395        match conn.out_queue.write(src) {
396            Ok(n) => {
397                conn.flush(&mut self.shared);
398                try!(conn.update_readiness());
399                Ok(n)
400            }
401            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
402                conn.last_maxed_out_window = Instant::now();
403                try!(conn.update_readiness());
404                Err(io::ErrorKind::WouldBlock.into())
405            }
406            Err(e) => {
407                Err(e)
408            }
409        }
410    }
411
412    /// Connect a new `UtpSocket` to the given remote socket address
413    fn connect(&mut self, addr: &SocketAddr, inner: &InnerCell) -> io::Result<UtpStream> {
414        if self.connections.len() == MAX_CONNECTIONS_PER_SOCKET {
415            return Err(io::Error::new(io::ErrorKind::Other, "socket has max connections"));
416        }
417
418        debug_assert!(self.connections.len() < MAX_CONNECTIONS_PER_SOCKET);
419
420        // The peer establishing the connection picks the identifiers uses for
421        // the stream.
422        let (receive_id, mut send_id) = util::generate_sequential_identifiers();
423
424        let mut key = Key {
425            receive_id: receive_id,
426            addr: addr.clone()
427        };
428
429        // Because the IDs are randomly generated, there could already be an
430        // existing connection with the key, so sequentially scan until we hit a
431        // free slot.
432        while self.connection_lookup.contains_key(&key) {
433            key.receive_id += 1;
434            send_id += 1;
435        }
436
437        // SYN packet has seq_nr of 1
438        let mut out_queue = OutQueue::new(send_id, 0, None);
439
440        let mut packet = Packet::syn();
441        packet.set_connection_id(key.receive_id);
442
443        // Queue the syn packet
444        out_queue.push(packet);
445
446        let (registration, set_readiness) = Registration::new2();
447        let now = Instant::now();
448
449        let token = self.connections.insert(Connection {
450            state: State::SynSent,
451            key: key.clone(),
452            set_readiness: set_readiness,
453            out_queue: out_queue,
454            in_queue: InQueue::new(None),
455            our_delays: Delays::new(),
456            their_delays: Delays::new(),
457            released: false,
458            deadline: Some(now + Duration::from_millis(DEFAULT_TIMEOUT_MS)),
459            last_maxed_out_window: now,
460            average_delay: 0,
461            current_delay_sum: 0,
462            current_delay_samples: 0,
463            average_delay_base: 0,
464            average_sample_time: now,
465            clock_drift: 0,
466            slow_start: true,
467        });
468
469        // Track the connection in the lookup
470        self.connection_lookup.insert(key, token);
471
472        self.flush();
473
474        Ok(UtpStream {
475            inner: inner.clone(),
476            token: token,
477            registration: registration,
478        })
479    }
480
481    fn close(&mut self, token: usize) {
482        let finalized = {
483            let conn = &mut self.connections[token];
484            conn.released = true;
485            conn.send_fin(false, &mut self.shared);
486            conn.flush(&mut self.shared);
487            conn.is_finalized()
488        };
489
490        if finalized {
491            self.remove_connection(token);
492        }
493    }
494
495    fn ready(&mut self, ready: Ready, inner: &InnerCell) -> io::Result<()> {
496        trace!("ready; ready={:?}", ready);
497
498        // Update readiness
499        self.shared.update_ready(ready);
500
501        loop {
502            // Try to receive a packet
503            let (packet, addr) = match self.recv_from() {
504                Ok(v) => v,
505                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
506                    trace!("ready -> would block");
507                    break;
508                }
509                Err(e) => {
510                    trace!("recv_from; error={:?}", e);
511                    return Err(e);
512                }
513            };
514
515            trace!("recv_from; addr={:?}; packet={:?}", addr, packet);
516
517            match self.process(packet, addr, inner) {
518                Ok(_) => {}
519                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
520                    panic!("NOPE");
521                }
522                Err(e) => return Err(e),
523            }
524        }
525
526        self.flush();
527        Ok(())
528    }
529
530    fn tick(&mut self) -> io::Result<()> {
531        trace!("Socket::tick");
532        for &idx in self.connection_lookup.values() {
533            try!(self.connections[idx].tick(&mut self.shared));
534        }
535
536        Ok(())
537    }
538
539    fn process(&mut self,
540               packet: Packet,
541               addr: SocketAddr,
542               inner: &InnerCell) -> io::Result<()>
543    {
544        // Process the packet
545        match packet.ty() {
546            packet::Type::Syn => {
547                // SYN packets are special
548                self.process_syn(packet, addr, inner)
549            }
550            _ => {
551                // All other packets are associated with a connection, and as
552                // such they should be sequenced.
553                let key = Key::new(packet.connection_id(), addr);
554
555                match self.connection_lookup.get(&key) {
556                    Some(&token) => {
557                        let finalized = {
558                            let conn = &mut self.connections[token];
559                            try!(conn.process(packet, &mut self.shared))
560                        };
561
562                        if finalized {
563                            let conn = self.remove_connection(token);
564                        }
565
566                        Ok(())
567                    }
568                    None => {
569                        trace!("no connection associated with ID; dropping packet");
570
571                        // Send the RESET packet, ignoring errors...
572                        let mut p = Packet::reset();
573                        p.set_connection_id(packet.connection_id());
574
575                        let _ = self.shared.socket.send_to(p.as_slice(), &addr);
576
577                        return Ok(());
578                    }
579                }
580            }
581        }
582    }
583
584    fn process_syn(&mut self,
585                   packet: Packet,
586                   addr: SocketAddr,
587                   inner: &InnerCell) -> io::Result<()>
588    {
589        if !self.listener_open {
590            // Send the RESET packet, ignoring errors...
591            let mut p = Packet::reset();
592            p.set_connection_id(packet.connection_id());
593
594            let _ = self.shared.socket.send_to(p.as_slice(), &addr);
595
596            return Ok(());
597        }
598
599        let seq_nr = util::rand();
600        let ack_nr = packet.seq_nr();
601        let send_id = packet.connection_id();
602        let receive_id = send_id + 1;
603
604        // TODO: If accept buffer is full, reset the connection
605
606        let key = Key {
607            receive_id: receive_id,
608            addr: addr,
609        };
610
611        if self.connection_lookup.contains_key(&key) {
612            // Just ignore the packet...
613            return Ok(());
614        }
615
616        let (registration, set_readiness) = Registration::new2();
617
618        let now = Instant::now();
619
620        let mut connection = Connection {
621            state: State::SynRecv,
622            key: key.clone(),
623            set_readiness: set_readiness,
624            out_queue: OutQueue::new(send_id, seq_nr, Some(ack_nr)),
625            in_queue: InQueue::new(Some(ack_nr)),
626            released: false,
627            our_delays: Delays::new(),
628            their_delays: Delays::new(),
629            deadline: None,
630            last_maxed_out_window: now,
631            average_delay: 0,
632            current_delay_sum: 0,
633            current_delay_samples: 0,
634            average_delay_base: 0,
635            average_sample_time: now,
636            clock_drift: 0,
637            slow_start: true,
638        };
639
640        // This will handle the state packet being sent
641        connection.flush(&mut self.shared);
642
643        let token = self.connections.insert(connection);
644        self.connection_lookup.insert(key, token);
645
646        // Store the connection in the accept buffer
647        self.accept_buf.push_back(UtpStream {
648            inner: inner.clone(),
649            token: token,
650            registration: registration,
651        });
652
653        // Notify the listener
654        try!(self.listener.set_readiness(Ready::readable()));
655
656        return Ok(());
657    }
658
659    fn recv_from(&mut self) -> io::Result<(Packet, SocketAddr)> {
660        // Ensure the buffer has at least 4kb of available space.
661        self.in_buf.reserve(MIN_BUFFER_SIZE);
662
663        // Read in the bytes
664        let addr = unsafe {
665            let (n, addr) = try!(self.shared.socket.recv_from(self.in_buf.bytes_mut()));
666            self.in_buf.advance_mut(n);
667            addr
668        };
669
670        // Try loading the header
671        let packet = try!(Packet::parse(self.in_buf.take()));
672
673        Ok((packet, addr))
674    }
675
676    fn flush(&mut self) {
677        // TODO: Make this smarter!
678
679        for &token in self.connection_lookup.values() {
680            if !self.shared.is_writable() {
681                return;
682            }
683
684            let conn = &mut self.connections[token];
685            conn.flush(&mut self.shared);
686        }
687    }
688
689    fn remove_connection(&mut self, token: usize) {
690        let connection = self.connections.remove(token);
691        self.connection_lookup.remove(&connection.key);
692        trace!("removing connection state; token={:?}, addr={:?}; id={:?}",
693               token, connection.key.addr, connection.key.receive_id);
694    }
695}
696
697impl Shared {
698    fn update_ready(&mut self, ready: Ready) {
699        self.ready = self.ready | ready;
700    }
701
702    fn is_writable(&self) -> bool {
703        self.ready.is_writable()
704    }
705
706    fn need_writable(&mut self) {
707        self.ready.remove(Ready::writable());
708    }
709}
710
711impl Connection {
712    fn update_local_window(&mut self) {
713        self.out_queue.set_local_window(self.in_queue.local_window());
714    }
715
716    /// Process an inbound packet for the connection
717    fn process(&mut self, packet: Packet, shared: &mut Shared) -> io::Result<bool> {
718        let now = Instant::now();
719
720        if self.state == State::Reset {
721            return Ok(self.is_finalized());
722        }
723
724        if packet.ty() == packet::Type::Reset {
725            self.state = State::Reset;
726
727            // Update readiness
728            try!(self.update_readiness());
729
730            return Ok(self.is_finalized());
731        }
732
733        // TODO: Invalid packets should be discarded here.
734
735        self.update_delays(now, &packet);
736
737        if packet.ty() == packet::Type::State {
738            // State packets are special, they do not have an associated
739            // sequence number, thus do not require ordering. They are only used
740            // to ACK packets, which is handled above, and to transition a
741            // connection into the connected state.
742            if self.state == State::SynSent {
743                self.in_queue.set_initial_ack_nr(packet.seq_nr());
744                self.out_queue.set_local_ack(packet.seq_nr());
745                self.out_queue.set_peer_window(packet.wnd_size());
746
747                self.state = State::Connected;
748            }
749        } else {
750            // TODO: validate the packet's ack_nr
751
752            // Add the packet to the inbound queue. This handles ordering
753            trace!("inqueue -- push packet");
754            if !self.in_queue.push(packet) {
755                // Invalid packet, avoid any further processing
756                trace!("invalid packet");
757                return Ok(false);
758            }
759        }
760
761        // TODO: count duplicate ACK counter
762
763        trace!("polling from in_queue");
764
765        while let Some(packet) = self.in_queue.poll() {
766            trace!("process; packet={:?}; state={:?}", packet, self.state);
767
768            // Update the peer window size
769            self.out_queue.set_peer_window(packet.wnd_size());
770
771            // At this point, we only receive CTL frames. Data is held in the
772            // queue
773            match packet.ty() {
774                packet::Type::Reset => {
775                    self.state = State::Reset;
776                }
777                packet::Type::Fin => {
778                    self.send_fin(true, shared);
779                }
780                packet::Type::Data |
781                    packet::Type::Syn |
782                    packet::Type::State => unreachable!(),
783            }
784        }
785
786        trace!("updating local window, acks; window={:?}; ack={:?}",
787               self.in_queue.local_window(),
788               self.in_queue.ack_nr());
789
790        self.update_local_window();
791        self.out_queue.set_local_ack(self.in_queue.ack_nr());
792
793        // Reset the timeout
794        self.reset_timeout();
795
796        // Flush out queue
797        self.flush(shared);
798
799        // Update readiness
800        try!(self.update_readiness());
801
802        Ok(self.is_finalized())
803    }
804
805    fn flush(&mut self, shared: &mut Shared) {
806        let mut sent = false;
807
808        if self.state == State::Reset {
809            return;
810        }
811
812        while let Some(next) = self.out_queue.next() {
813            if !shared.is_writable() {
814                return;
815            }
816
817            trace!("send_to; addr={:?}; packet={:?}", self.key.addr, next.packet());
818
819            match shared.socket.send_to(next.packet().as_slice(), &self.key.addr) {
820                Ok(n) => {
821                    assert_eq!(n, next.packet().as_slice().len());
822                    next.sent();
823
824                    // Reset the connection timeout
825                    sent = true;
826                }
827                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
828                    shared.need_writable();
829                    return;
830                }
831                Err(e) => {
832                    panic!("TODO: implement error handling {:?}", e);
833                }
834            }
835        }
836
837        if sent {
838            self.reset_timeout();
839        }
840    }
841
842    fn tick(&mut self, shared: &mut Shared) -> io::Result<()> {
843        if self.state == State::Reset {
844            return Ok(());
845        }
846
847        if let Some(deadline) = self.deadline {
848            if Instant::now() >= deadline {
849                trace!("connection timed out; id={}", self.out_queue.connection_id());
850                self.out_queue.timed_out();
851                self.flush(shared);
852            }
853        }
854
855        Ok(())
856    }
857
858    fn update_delays(&mut self, now: Instant, packet: &Packet) {
859        let mut actual_delay = u32::MAX;
860
861        if packet.timestamp() > 0 {
862            // Use the packet to update the delay value
863            let their_delay = self.out_queue.update_their_delay(packet.timestamp());
864            let prev_base_delay = self.their_delays.base_delay();
865
866            // Track the delay
867            self.their_delays.add_sample(their_delay, now);
868
869            if let Some(prev) = prev_base_delay {
870                let new = self.their_delays.base_delay().unwrap();
871
872                // If their new base delay is less than their previous one, we
873                // should shift our delay base in the other direction in order
874                // to take the clock skew into account.
875                let lt = util::wrapping_lt(new, prev, TIMESTAMP_MASK);
876                let diff = prev.wrapping_sub(new);
877
878                if lt && diff <= 10_000 {
879                    self.our_delays.shift(diff);
880                }
881            }
882
883            actual_delay = packet.timestamp_diff();
884
885            if actual_delay != u32::MAX {
886                self.our_delays.add_sample(actual_delay, now);
887
888                if self.average_delay_base == 0 {
889                    self.average_delay_base = actual_delay;
890                }
891
892                let average_delay_sample;
893                let dist_down = self.average_delay_base.wrapping_sub(actual_delay);
894                let dist_up = actual_delay.wrapping_sub(self.average_delay_base);
895
896                if dist_down > dist_up {
897                    average_delay_sample = dist_up as i64;
898                } else {
899                    average_delay_sample = -(dist_down as i64);
900                }
901
902                self.current_delay_sum = self.current_delay_sum.wrapping_add(average_delay_sample);
903                self.current_delay_samples += 1;
904
905                if now > self.average_sample_time {
906                    let mut prev_average_delay = self.average_delay;
907                    self.average_delay = (self.current_delay_sum / self.current_delay_samples) as i32;
908                    self.average_sample_time = now + Duration::from_secs(5);
909
910                    self.current_delay_sum = 0;
911                    self.current_delay_samples = 0;
912
913                    let min_sample = cmp::min(prev_average_delay, self.average_delay);
914                    let max_sample = cmp::max(prev_average_delay, self.average_delay);
915
916                    if min_sample > 0 {
917                        self.average_delay_base += min_sample as u32;
918                        self.average_delay -= min_sample;
919                        prev_average_delay -= min_sample;
920                    } else if max_sample < 0 {
921                        let adjust = -max_sample;
922
923                        self.average_delay_base -= adjust as u32;
924                        self.average_delay += adjust;
925                        prev_average_delay += adjust;
926                    }
927
928                    // Update the clock drive estimate
929                    let drift = self.average_delay as i64 - prev_average_delay as i64;
930
931                    self.clock_drift = ((self.clock_drift as i64 * 7 + drift) / 8) as i32;
932                }
933            }
934        }
935
936        // Ack all packets
937        if let Some((acked_bytes, min_rtt)) = self.out_queue.set_their_ack(packet.ack_nr(), now) {
938            let min_rtt = util::as_wrapping_micros(min_rtt);
939
940            if let Some(delay) = self.our_delays.get() {
941                if delay > min_rtt {
942                    self.our_delays.shift(delay.wrapping_sub(min_rtt));
943                }
944            }
945
946            if actual_delay != u32::MAX && acked_bytes >= 1 {
947                self.apply_congestion_control(acked_bytes, actual_delay, min_rtt, now);
948            }
949        }
950    }
951
952    fn apply_congestion_control(&mut self,
953                                bytes_acked: usize,
954                                actual_delay: u32,
955                                min_rtt: u32,
956                                now: Instant)
957    {
958        trace!("applying congenstion control; bytes_acked={}; actual_delay={}; min_rtt={}",
959               bytes_acked, actual_delay, min_rtt);
960
961        let target = TARGET_DELAY;
962
963        let mut our_delay = cmp::min(self.our_delays.get().unwrap(), min_rtt);
964        let max_window = self.out_queue.max_window() as usize;
965
966        if self.clock_drift < -200_000 {
967            let penalty = (-self.clock_drift - 200_000) / 7;
968
969            if penalty > 0 {
970                our_delay += penalty as u32;
971            } else {
972                our_delay -= (-penalty) as u32;
973            }
974        }
975
976        let off_target = (target - our_delay) as f64;
977        let window_factor =
978            cmp::min(bytes_acked, max_window) as f64 /
979            cmp::max(max_window, bytes_acked) as f64;
980
981        let delay_factor = off_target / target as f64;
982        let mut scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT as f64 *
983            window_factor * delay_factor;
984
985        if scaled_gain > 0.0 && now - self.last_maxed_out_window > Duration::from_secs(1) {
986            // if it was more than 1 second since we tried to send a packet and
987            // stopped because we hit the max window, we're most likely rate
988            // limited (which prevents us from ever hitting the window size) if
989            // this is the case, we cannot let the max_window grow indefinitely
990            scaled_gain = 0.0;
991        }
992
993        let ledbat_cwnd = if max_window + (scaled_gain as usize) < MIN_WINDOW_SIZE {
994            MIN_WINDOW_SIZE
995        } else {
996            max_window + scaled_gain as usize
997        };
998
999        if self.slow_start {
1000            let ss_cwnd = max_window + window_factor as usize * MAX_DATA_SIZE;
1001
1002            if ss_cwnd > SLOW_START_THRESHOLD {
1003                self.slow_start = false;
1004            } else if our_delay > (target as f64 * 0.9) as u32 {
1005                // Even if we're a little under the target delay, we
1006                // conservatively discontinue the slow start phase
1007                self.slow_start = false;
1008            } else {
1009                self.out_queue.set_max_window(cmp::max(ss_cwnd, ledbat_cwnd) as u32);
1010            }
1011        } else {
1012            self.out_queue.set_max_window(ledbat_cwnd as u32);
1013        }
1014    }
1015
1016    fn reset_timeout(&mut self) {
1017        self.deadline = self.out_queue.socket_timeout()
1018            .map(|dur| {
1019                trace!("resetting timeout; duration={:?}", dur);
1020                Instant::now() + dur
1021            });
1022    }
1023
1024    fn send_fin(&mut self, _: bool, shared: &mut Shared) {
1025        if self.state.is_closed() {
1026            return;
1027        }
1028
1029        self.out_queue.push(Packet::fin());
1030        self.state = State::FinSent;
1031    }
1032
1033    fn is_finalized(&self) -> bool {
1034        self.released &&
1035            ((self.out_queue.is_empty() && self.state.is_closed()) ||
1036             self.state == State::Reset)
1037    }
1038
1039    /// Update the UtpStream's readiness
1040    fn update_readiness(&self) -> io::Result<()> {
1041        let mut ready = Ready::empty();
1042
1043        if self.state == State::Connected {
1044            if self.is_readable() {
1045                ready.insert(Ready::readable());
1046            }
1047
1048            if self.is_writable() {
1049                ready.insert(Ready::writable());
1050            }
1051        } else if self.state.is_closed() {
1052            ready = Ready::readable();
1053        }
1054
1055        trace!("updating socket readiness; ready={:?}", ready);
1056
1057        self.set_readiness.set_readiness(ready)
1058    }
1059
1060    // =========
1061
1062    fn is_readable(&self) -> bool {
1063        self.in_queue.is_readable()
1064    }
1065
1066    fn is_writable(&self) -> bool {
1067        self.out_queue.is_writable()
1068    }
1069}
1070
1071impl State {
1072    fn is_closed(&self) -> bool {
1073        match *self {
1074            State::FinSent | State::Reset => true,
1075            _ => false,
1076        }
1077    }
1078}
1079
1080impl Key {
1081    fn new(receive_id: u16, addr: SocketAddr) -> Key {
1082        Key {
1083            receive_id: receive_id,
1084            addr: addr,
1085        }
1086    }
1087}