1use {util, TIMESTAMP_MASK};
2use delays::Delays;
3use in_queue::InQueue;
4use out_queue::OutQueue;
5use packet::{self, Packet};
6
7use mio::net::UdpSocket;
8use mio::{Evented, Registration, SetReadiness, Ready, Poll, PollOpt, Token};
9
10use bytes::{BytesMut, BufMut};
11use slab::Slab;
12
13use std::{cmp, io, u32};
14use std::cell::RefCell;
15use std::rc::Rc;
16use std::net::SocketAddr;
17use std::collections::{HashMap, VecDeque};
18use std::time::{Duration, Instant};
19
20pub struct UtpSocket {
21 inner: InnerCell,
23}
24
25pub struct UtpStream {
27 inner: InnerCell,
29
30 token: usize,
32
33 registration: Registration,
35}
36
37pub struct UtpListener {
38 inner: InnerCell,
40
41 registration: Registration,
43}
44
45struct Inner {
47 shared: Shared,
50
51 connections: Slab<Connection>,
53
54 connection_lookup: HashMap<Key, usize>,
56
57 in_buf: BytesMut,
59
60 accept_buf: VecDeque<UtpStream>,
61
62 listener: SetReadiness,
63
64 listener_open: bool,
65}
66
67struct Shared {
68 socket: UdpSocket,
70
71 ready: Ready,
74
75 out_buf: Vec<u8>,
77
78 out_buf_dst: Option<SocketAddr>,
80}
81
82#[derive(Debug)]
84struct Connection {
85 state: State,
87
88 key: Key,
90
91 released: bool,
93
94 set_readiness: SetReadiness,
96
97 out_queue: OutQueue,
100
101 in_queue: InQueue,
104
105 deadline: Option<Instant>,
107
108 our_delays: Delays,
110
111 their_delays: Delays,
112
113 last_maxed_out_window: Instant,
114 average_delay: i32,
115 current_delay_sum: i64,
116 current_delay_samples: i64,
117 average_delay_base: u32,
118 average_sample_time: Instant,
119 clock_drift: i32,
120 slow_start: bool,
121}
122
123#[derive(Debug, Clone, Eq, PartialEq, Hash)]
124struct Key {
125 receive_id: u16,
126 addr: SocketAddr,
127}
128
129#[derive(Debug, Clone, Eq, PartialEq)]
130enum State {
131 SynSent,
134 SynRecv,
137 Connected,
139 FinSent,
142 Reset,
144}
145
146type InnerCell = Rc<RefCell<Inner>>;
147
148const MIN_BUFFER_SIZE: usize = 4 * 1_024;
149const MAX_BUFFER_SIZE: usize = 64 * 1_024;
150const DEFAULT_IN_BUFFER_SIZE: usize = 64 * 1024;
151const DEFAULT_OUT_BUFFER_SIZE: usize = 4 * 1024;
152const MAX_CONNECTIONS_PER_SOCKET: usize = 2 * 1024;
153const DEFAULT_TIMEOUT_MS: u64 = 1_000;
154const TARGET_DELAY: u32 = 100_000; const SLOW_START_THRESHOLD: usize = DEFAULT_IN_BUFFER_SIZE;
157const MAX_CWND_INCREASE_BYTES_PER_RTT: usize = 3000;
158const MIN_WINDOW_SIZE: usize = 10;
159const MAX_DATA_SIZE: usize = 1_400 - 20;
160
161impl UtpSocket {
162 pub fn bind(addr: &SocketAddr) -> io::Result<(UtpSocket, UtpListener)> {
164 UdpSocket::bind(addr).map(UtpSocket::from_socket)
165 }
166
167 pub fn local_addr(&self) -> io::Result<SocketAddr> {
168 self.inner.borrow().shared.socket.local_addr()
169 }
170
171 pub fn from_socket(socket: UdpSocket) -> (UtpSocket, UtpListener) {
173 let (registration, set_readiness) = Registration::new2();
174
175 let inner = Rc::new(RefCell::new(Inner {
176 shared: Shared {
177 socket: socket,
178 ready: Ready::empty(),
179 out_buf: Vec::with_capacity(DEFAULT_OUT_BUFFER_SIZE),
180 out_buf_dst: None,
181 },
182 connections: Slab::new(),
183 connection_lookup: HashMap::new(),
184 in_buf: BytesMut::with_capacity(DEFAULT_IN_BUFFER_SIZE),
185 accept_buf: VecDeque::new(),
186 listener: set_readiness,
187 listener_open: true,
188 }));
189
190 let listener = UtpListener {
191 inner: inner.clone(),
192 registration: registration,
193 };
194
195 let socket = UtpSocket {
196 inner: inner,
197 };
198
199 (socket, listener)
200 }
201
202 pub fn connect(&self, addr: &SocketAddr) -> io::Result<UtpStream> {
204 self.inner.borrow_mut().connect(addr, &self.inner)
205 }
206
207 pub fn ready(&self, ready: Ready) -> io::Result<()> {
209 self.inner.borrow_mut().ready(ready, &self.inner)
210 }
211
212 pub fn tick(&self) -> io::Result<()> {
214 self.inner.borrow_mut().tick()
215 }
216}
217
218impl Evented for UtpSocket {
219 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
220 -> io::Result<()>
221 {
222 self.inner.borrow_mut().shared.socket.register(poll, token, interest, opts)
223 }
224
225 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
226 -> io::Result<()>
227 {
228 self.inner.borrow_mut().shared.socket.reregister(poll, token, interest, opts)
229 }
230
231 fn deregister(&self, poll: &Poll) -> io::Result<()> {
232 self.inner.borrow_mut().shared.socket.deregister(poll)
233 }
234}
235
236impl UtpListener {
237 pub fn accept(&self) -> io::Result<UtpStream> {
241 self.inner.borrow_mut().accept()
242 }
243}
244
245impl Drop for UtpListener {
246 fn drop(&mut self) {
247 let mut inner = self.inner.borrow_mut();
248 inner.listener_open = false;
249
250 while let Ok(_) = inner.accept() {}
252 }
253}
254
255#[cfg(test)]
256impl UtpListener {
257 pub fn is_readable(&self) -> bool {
258 let inner = self.inner.borrow();
259 inner.listener.readiness().is_readable()
260 }
261}
262
263impl Evented for UtpListener {
264 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
265 -> io::Result<()>
266 {
267 self.registration.register(poll, token, interest, opts)
268 }
269
270 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
271 -> io::Result<()>
272 {
273 self.registration.reregister(poll, token, interest, opts)
274 }
275
276 fn deregister(&self, poll: &Poll) -> io::Result<()> {
277 Evented::deregister(&self.registration, poll)
278 }
279}
280
281impl UtpStream {
282 pub fn local_addr(&self) -> io::Result<SocketAddr> {
283 let inner = self.inner.borrow();
284 inner.shared.socket.local_addr()
285 }
286
287 pub fn read(&self, dst: &mut [u8]) -> io::Result<usize> {
288 let mut inner = self.inner.borrow_mut();
289 let connection = &mut inner.connections[self.token];
290
291 match connection.in_queue.read(dst) {
292 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
293 if connection.state == State::Connected {
294 try!(connection.update_readiness());
295 Err(io::ErrorKind::WouldBlock.into())
296 } else if connection.state.is_closed() {
297 if connection.state == State::Reset {
298 Err(io::ErrorKind::ConnectionReset.into())
299 } else {
300 Ok(0)
301 }
302 } else {
303 unreachable!();
304 }
305 }
306 ret => {
307 connection.update_local_window();
308 ret
309 }
310 }
311 }
312
313 pub fn write(&self, src: &[u8]) -> io::Result<usize> {
314 self.inner.borrow_mut().write(self.token, src)
315 }
316}
317
318#[cfg(test)]
319impl UtpStream {
320 pub fn is_readable(&self) -> bool {
321 let inner = self.inner.borrow();
322 let connection = &inner.connections[self.token];
323 connection.set_readiness.readiness().is_readable()
324 }
325
326 pub fn is_writable(&self) -> bool {
327 let inner = self.inner.borrow();
328 let connection = &inner.connections[self.token];
329 connection.set_readiness.readiness().is_writable()
330 }
331}
332
333impl Drop for UtpStream {
334 fn drop(&mut self) {
335 self.inner.borrow_mut().close(self.token);
336 }
337}
338
339impl Evented for UtpStream {
340 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
341 -> io::Result<()>
342 {
343 self.registration.register(poll, token, interest, opts)
344 }
345
346 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
347 -> io::Result<()>
348 {
349 self.registration.reregister(poll, token, interest, opts)
350 }
351
352 fn deregister(&self, poll: &Poll) -> io::Result<()> {
353 Evented::deregister(&self.registration, poll)
354 }
355}
356
357impl Inner {
358 fn accept(&mut self) -> io::Result<UtpStream> {
359 match self.accept_buf.pop_front() {
360 Some(socket) => {
361 let conn = &mut self.connections[socket.token];
362
363 if conn.state == State::SynRecv {
364 conn.state = State::Connected;
365 } else if conn.state.is_closed() {
366 } else {
369 unreachable!();
370 }
371
372 try!(conn.update_readiness());
373
374 Ok(socket)
375 }
376 None => {
377 try!(self.listener.set_readiness(Ready::empty()));
379
380 Err(io::ErrorKind::WouldBlock.into())
381 }
382 }
383 }
384
385 fn write(&mut self, token: usize, src: &[u8]) -> io::Result<usize> {
386 let conn = &mut self.connections[token];
387
388 if conn.state != State::Connected {
389 assert!(conn.state.is_closed(),
390 "expected closed state; actual={:?}", conn.state);
391
392 return Err(io::ErrorKind::BrokenPipe.into());
393 }
394
395 match conn.out_queue.write(src) {
396 Ok(n) => {
397 conn.flush(&mut self.shared);
398 try!(conn.update_readiness());
399 Ok(n)
400 }
401 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
402 conn.last_maxed_out_window = Instant::now();
403 try!(conn.update_readiness());
404 Err(io::ErrorKind::WouldBlock.into())
405 }
406 Err(e) => {
407 Err(e)
408 }
409 }
410 }
411
412 fn connect(&mut self, addr: &SocketAddr, inner: &InnerCell) -> io::Result<UtpStream> {
414 if self.connections.len() == MAX_CONNECTIONS_PER_SOCKET {
415 return Err(io::Error::new(io::ErrorKind::Other, "socket has max connections"));
416 }
417
418 debug_assert!(self.connections.len() < MAX_CONNECTIONS_PER_SOCKET);
419
420 let (receive_id, mut send_id) = util::generate_sequential_identifiers();
423
424 let mut key = Key {
425 receive_id: receive_id,
426 addr: addr.clone()
427 };
428
429 while self.connection_lookup.contains_key(&key) {
433 key.receive_id += 1;
434 send_id += 1;
435 }
436
437 let mut out_queue = OutQueue::new(send_id, 0, None);
439
440 let mut packet = Packet::syn();
441 packet.set_connection_id(key.receive_id);
442
443 out_queue.push(packet);
445
446 let (registration, set_readiness) = Registration::new2();
447 let now = Instant::now();
448
449 let token = self.connections.insert(Connection {
450 state: State::SynSent,
451 key: key.clone(),
452 set_readiness: set_readiness,
453 out_queue: out_queue,
454 in_queue: InQueue::new(None),
455 our_delays: Delays::new(),
456 their_delays: Delays::new(),
457 released: false,
458 deadline: Some(now + Duration::from_millis(DEFAULT_TIMEOUT_MS)),
459 last_maxed_out_window: now,
460 average_delay: 0,
461 current_delay_sum: 0,
462 current_delay_samples: 0,
463 average_delay_base: 0,
464 average_sample_time: now,
465 clock_drift: 0,
466 slow_start: true,
467 });
468
469 self.connection_lookup.insert(key, token);
471
472 self.flush();
473
474 Ok(UtpStream {
475 inner: inner.clone(),
476 token: token,
477 registration: registration,
478 })
479 }
480
481 fn close(&mut self, token: usize) {
482 let finalized = {
483 let conn = &mut self.connections[token];
484 conn.released = true;
485 conn.send_fin(false, &mut self.shared);
486 conn.flush(&mut self.shared);
487 conn.is_finalized()
488 };
489
490 if finalized {
491 self.remove_connection(token);
492 }
493 }
494
495 fn ready(&mut self, ready: Ready, inner: &InnerCell) -> io::Result<()> {
496 trace!("ready; ready={:?}", ready);
497
498 self.shared.update_ready(ready);
500
501 loop {
502 let (packet, addr) = match self.recv_from() {
504 Ok(v) => v,
505 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
506 trace!("ready -> would block");
507 break;
508 }
509 Err(e) => {
510 trace!("recv_from; error={:?}", e);
511 return Err(e);
512 }
513 };
514
515 trace!("recv_from; addr={:?}; packet={:?}", addr, packet);
516
517 match self.process(packet, addr, inner) {
518 Ok(_) => {}
519 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
520 panic!("NOPE");
521 }
522 Err(e) => return Err(e),
523 }
524 }
525
526 self.flush();
527 Ok(())
528 }
529
530 fn tick(&mut self) -> io::Result<()> {
531 trace!("Socket::tick");
532 for &idx in self.connection_lookup.values() {
533 try!(self.connections[idx].tick(&mut self.shared));
534 }
535
536 Ok(())
537 }
538
539 fn process(&mut self,
540 packet: Packet,
541 addr: SocketAddr,
542 inner: &InnerCell) -> io::Result<()>
543 {
544 match packet.ty() {
546 packet::Type::Syn => {
547 self.process_syn(packet, addr, inner)
549 }
550 _ => {
551 let key = Key::new(packet.connection_id(), addr);
554
555 match self.connection_lookup.get(&key) {
556 Some(&token) => {
557 let finalized = {
558 let conn = &mut self.connections[token];
559 try!(conn.process(packet, &mut self.shared))
560 };
561
562 if finalized {
563 let conn = self.remove_connection(token);
564 }
565
566 Ok(())
567 }
568 None => {
569 trace!("no connection associated with ID; dropping packet");
570
571 let mut p = Packet::reset();
573 p.set_connection_id(packet.connection_id());
574
575 let _ = self.shared.socket.send_to(p.as_slice(), &addr);
576
577 return Ok(());
578 }
579 }
580 }
581 }
582 }
583
584 fn process_syn(&mut self,
585 packet: Packet,
586 addr: SocketAddr,
587 inner: &InnerCell) -> io::Result<()>
588 {
589 if !self.listener_open {
590 let mut p = Packet::reset();
592 p.set_connection_id(packet.connection_id());
593
594 let _ = self.shared.socket.send_to(p.as_slice(), &addr);
595
596 return Ok(());
597 }
598
599 let seq_nr = util::rand();
600 let ack_nr = packet.seq_nr();
601 let send_id = packet.connection_id();
602 let receive_id = send_id + 1;
603
604 let key = Key {
607 receive_id: receive_id,
608 addr: addr,
609 };
610
611 if self.connection_lookup.contains_key(&key) {
612 return Ok(());
614 }
615
616 let (registration, set_readiness) = Registration::new2();
617
618 let now = Instant::now();
619
620 let mut connection = Connection {
621 state: State::SynRecv,
622 key: key.clone(),
623 set_readiness: set_readiness,
624 out_queue: OutQueue::new(send_id, seq_nr, Some(ack_nr)),
625 in_queue: InQueue::new(Some(ack_nr)),
626 released: false,
627 our_delays: Delays::new(),
628 their_delays: Delays::new(),
629 deadline: None,
630 last_maxed_out_window: now,
631 average_delay: 0,
632 current_delay_sum: 0,
633 current_delay_samples: 0,
634 average_delay_base: 0,
635 average_sample_time: now,
636 clock_drift: 0,
637 slow_start: true,
638 };
639
640 connection.flush(&mut self.shared);
642
643 let token = self.connections.insert(connection);
644 self.connection_lookup.insert(key, token);
645
646 self.accept_buf.push_back(UtpStream {
648 inner: inner.clone(),
649 token: token,
650 registration: registration,
651 });
652
653 try!(self.listener.set_readiness(Ready::readable()));
655
656 return Ok(());
657 }
658
659 fn recv_from(&mut self) -> io::Result<(Packet, SocketAddr)> {
660 self.in_buf.reserve(MIN_BUFFER_SIZE);
662
663 let addr = unsafe {
665 let (n, addr) = try!(self.shared.socket.recv_from(self.in_buf.bytes_mut()));
666 self.in_buf.advance_mut(n);
667 addr
668 };
669
670 let packet = try!(Packet::parse(self.in_buf.take()));
672
673 Ok((packet, addr))
674 }
675
676 fn flush(&mut self) {
677 for &token in self.connection_lookup.values() {
680 if !self.shared.is_writable() {
681 return;
682 }
683
684 let conn = &mut self.connections[token];
685 conn.flush(&mut self.shared);
686 }
687 }
688
689 fn remove_connection(&mut self, token: usize) {
690 let connection = self.connections.remove(token);
691 self.connection_lookup.remove(&connection.key);
692 trace!("removing connection state; token={:?}, addr={:?}; id={:?}",
693 token, connection.key.addr, connection.key.receive_id);
694 }
695}
696
697impl Shared {
698 fn update_ready(&mut self, ready: Ready) {
699 self.ready = self.ready | ready;
700 }
701
702 fn is_writable(&self) -> bool {
703 self.ready.is_writable()
704 }
705
706 fn need_writable(&mut self) {
707 self.ready.remove(Ready::writable());
708 }
709}
710
711impl Connection {
712 fn update_local_window(&mut self) {
713 self.out_queue.set_local_window(self.in_queue.local_window());
714 }
715
716 fn process(&mut self, packet: Packet, shared: &mut Shared) -> io::Result<bool> {
718 let now = Instant::now();
719
720 if self.state == State::Reset {
721 return Ok(self.is_finalized());
722 }
723
724 if packet.ty() == packet::Type::Reset {
725 self.state = State::Reset;
726
727 try!(self.update_readiness());
729
730 return Ok(self.is_finalized());
731 }
732
733 self.update_delays(now, &packet);
736
737 if packet.ty() == packet::Type::State {
738 if self.state == State::SynSent {
743 self.in_queue.set_initial_ack_nr(packet.seq_nr());
744 self.out_queue.set_local_ack(packet.seq_nr());
745 self.out_queue.set_peer_window(packet.wnd_size());
746
747 self.state = State::Connected;
748 }
749 } else {
750 trace!("inqueue -- push packet");
754 if !self.in_queue.push(packet) {
755 trace!("invalid packet");
757 return Ok(false);
758 }
759 }
760
761 trace!("polling from in_queue");
764
765 while let Some(packet) = self.in_queue.poll() {
766 trace!("process; packet={:?}; state={:?}", packet, self.state);
767
768 self.out_queue.set_peer_window(packet.wnd_size());
770
771 match packet.ty() {
774 packet::Type::Reset => {
775 self.state = State::Reset;
776 }
777 packet::Type::Fin => {
778 self.send_fin(true, shared);
779 }
780 packet::Type::Data |
781 packet::Type::Syn |
782 packet::Type::State => unreachable!(),
783 }
784 }
785
786 trace!("updating local window, acks; window={:?}; ack={:?}",
787 self.in_queue.local_window(),
788 self.in_queue.ack_nr());
789
790 self.update_local_window();
791 self.out_queue.set_local_ack(self.in_queue.ack_nr());
792
793 self.reset_timeout();
795
796 self.flush(shared);
798
799 try!(self.update_readiness());
801
802 Ok(self.is_finalized())
803 }
804
805 fn flush(&mut self, shared: &mut Shared) {
806 let mut sent = false;
807
808 if self.state == State::Reset {
809 return;
810 }
811
812 while let Some(next) = self.out_queue.next() {
813 if !shared.is_writable() {
814 return;
815 }
816
817 trace!("send_to; addr={:?}; packet={:?}", self.key.addr, next.packet());
818
819 match shared.socket.send_to(next.packet().as_slice(), &self.key.addr) {
820 Ok(n) => {
821 assert_eq!(n, next.packet().as_slice().len());
822 next.sent();
823
824 sent = true;
826 }
827 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
828 shared.need_writable();
829 return;
830 }
831 Err(e) => {
832 panic!("TODO: implement error handling {:?}", e);
833 }
834 }
835 }
836
837 if sent {
838 self.reset_timeout();
839 }
840 }
841
842 fn tick(&mut self, shared: &mut Shared) -> io::Result<()> {
843 if self.state == State::Reset {
844 return Ok(());
845 }
846
847 if let Some(deadline) = self.deadline {
848 if Instant::now() >= deadline {
849 trace!("connection timed out; id={}", self.out_queue.connection_id());
850 self.out_queue.timed_out();
851 self.flush(shared);
852 }
853 }
854
855 Ok(())
856 }
857
858 fn update_delays(&mut self, now: Instant, packet: &Packet) {
859 let mut actual_delay = u32::MAX;
860
861 if packet.timestamp() > 0 {
862 let their_delay = self.out_queue.update_their_delay(packet.timestamp());
864 let prev_base_delay = self.their_delays.base_delay();
865
866 self.their_delays.add_sample(their_delay, now);
868
869 if let Some(prev) = prev_base_delay {
870 let new = self.their_delays.base_delay().unwrap();
871
872 let lt = util::wrapping_lt(new, prev, TIMESTAMP_MASK);
876 let diff = prev.wrapping_sub(new);
877
878 if lt && diff <= 10_000 {
879 self.our_delays.shift(diff);
880 }
881 }
882
883 actual_delay = packet.timestamp_diff();
884
885 if actual_delay != u32::MAX {
886 self.our_delays.add_sample(actual_delay, now);
887
888 if self.average_delay_base == 0 {
889 self.average_delay_base = actual_delay;
890 }
891
892 let average_delay_sample;
893 let dist_down = self.average_delay_base.wrapping_sub(actual_delay);
894 let dist_up = actual_delay.wrapping_sub(self.average_delay_base);
895
896 if dist_down > dist_up {
897 average_delay_sample = dist_up as i64;
898 } else {
899 average_delay_sample = -(dist_down as i64);
900 }
901
902 self.current_delay_sum = self.current_delay_sum.wrapping_add(average_delay_sample);
903 self.current_delay_samples += 1;
904
905 if now > self.average_sample_time {
906 let mut prev_average_delay = self.average_delay;
907 self.average_delay = (self.current_delay_sum / self.current_delay_samples) as i32;
908 self.average_sample_time = now + Duration::from_secs(5);
909
910 self.current_delay_sum = 0;
911 self.current_delay_samples = 0;
912
913 let min_sample = cmp::min(prev_average_delay, self.average_delay);
914 let max_sample = cmp::max(prev_average_delay, self.average_delay);
915
916 if min_sample > 0 {
917 self.average_delay_base += min_sample as u32;
918 self.average_delay -= min_sample;
919 prev_average_delay -= min_sample;
920 } else if max_sample < 0 {
921 let adjust = -max_sample;
922
923 self.average_delay_base -= adjust as u32;
924 self.average_delay += adjust;
925 prev_average_delay += adjust;
926 }
927
928 let drift = self.average_delay as i64 - prev_average_delay as i64;
930
931 self.clock_drift = ((self.clock_drift as i64 * 7 + drift) / 8) as i32;
932 }
933 }
934 }
935
936 if let Some((acked_bytes, min_rtt)) = self.out_queue.set_their_ack(packet.ack_nr(), now) {
938 let min_rtt = util::as_wrapping_micros(min_rtt);
939
940 if let Some(delay) = self.our_delays.get() {
941 if delay > min_rtt {
942 self.our_delays.shift(delay.wrapping_sub(min_rtt));
943 }
944 }
945
946 if actual_delay != u32::MAX && acked_bytes >= 1 {
947 self.apply_congestion_control(acked_bytes, actual_delay, min_rtt, now);
948 }
949 }
950 }
951
952 fn apply_congestion_control(&mut self,
953 bytes_acked: usize,
954 actual_delay: u32,
955 min_rtt: u32,
956 now: Instant)
957 {
958 trace!("applying congenstion control; bytes_acked={}; actual_delay={}; min_rtt={}",
959 bytes_acked, actual_delay, min_rtt);
960
961 let target = TARGET_DELAY;
962
963 let mut our_delay = cmp::min(self.our_delays.get().unwrap(), min_rtt);
964 let max_window = self.out_queue.max_window() as usize;
965
966 if self.clock_drift < -200_000 {
967 let penalty = (-self.clock_drift - 200_000) / 7;
968
969 if penalty > 0 {
970 our_delay += penalty as u32;
971 } else {
972 our_delay -= (-penalty) as u32;
973 }
974 }
975
976 let off_target = (target - our_delay) as f64;
977 let window_factor =
978 cmp::min(bytes_acked, max_window) as f64 /
979 cmp::max(max_window, bytes_acked) as f64;
980
981 let delay_factor = off_target / target as f64;
982 let mut scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT as f64 *
983 window_factor * delay_factor;
984
985 if scaled_gain > 0.0 && now - self.last_maxed_out_window > Duration::from_secs(1) {
986 scaled_gain = 0.0;
991 }
992
993 let ledbat_cwnd = if max_window + (scaled_gain as usize) < MIN_WINDOW_SIZE {
994 MIN_WINDOW_SIZE
995 } else {
996 max_window + scaled_gain as usize
997 };
998
999 if self.slow_start {
1000 let ss_cwnd = max_window + window_factor as usize * MAX_DATA_SIZE;
1001
1002 if ss_cwnd > SLOW_START_THRESHOLD {
1003 self.slow_start = false;
1004 } else if our_delay > (target as f64 * 0.9) as u32 {
1005 self.slow_start = false;
1008 } else {
1009 self.out_queue.set_max_window(cmp::max(ss_cwnd, ledbat_cwnd) as u32);
1010 }
1011 } else {
1012 self.out_queue.set_max_window(ledbat_cwnd as u32);
1013 }
1014 }
1015
1016 fn reset_timeout(&mut self) {
1017 self.deadline = self.out_queue.socket_timeout()
1018 .map(|dur| {
1019 trace!("resetting timeout; duration={:?}", dur);
1020 Instant::now() + dur
1021 });
1022 }
1023
1024 fn send_fin(&mut self, _: bool, shared: &mut Shared) {
1025 if self.state.is_closed() {
1026 return;
1027 }
1028
1029 self.out_queue.push(Packet::fin());
1030 self.state = State::FinSent;
1031 }
1032
1033 fn is_finalized(&self) -> bool {
1034 self.released &&
1035 ((self.out_queue.is_empty() && self.state.is_closed()) ||
1036 self.state == State::Reset)
1037 }
1038
1039 fn update_readiness(&self) -> io::Result<()> {
1041 let mut ready = Ready::empty();
1042
1043 if self.state == State::Connected {
1044 if self.is_readable() {
1045 ready.insert(Ready::readable());
1046 }
1047
1048 if self.is_writable() {
1049 ready.insert(Ready::writable());
1050 }
1051 } else if self.state.is_closed() {
1052 ready = Ready::readable();
1053 }
1054
1055 trace!("updating socket readiness; ready={:?}", ready);
1056
1057 self.set_readiness.set_readiness(ready)
1058 }
1059
1060 fn is_readable(&self) -> bool {
1063 self.in_queue.is_readable()
1064 }
1065
1066 fn is_writable(&self) -> bool {
1067 self.out_queue.is_writable()
1068 }
1069}
1070
1071impl State {
1072 fn is_closed(&self) -> bool {
1073 match *self {
1074 State::FinSent | State::Reset => true,
1075 _ => false,
1076 }
1077 }
1078}
1079
1080impl Key {
1081 fn new(receive_id: u16, addr: SocketAddr) -> Key {
1082 Key {
1083 receive_id: receive_id,
1084 addr: addr,
1085 }
1086 }
1087}