1use std::cmp;
4use std::cmp::Ordering;
5use std::collections::VecDeque;
6use std::io::{self, Cursor, Read, Write};
7
8use bytes::{Buf, BufMut, BytesMut};
9use error::Error;
10use KcpResult;
11
12const KCP_RTO_NDL: u32 = 30;
13const KCP_RTO_MIN: u32 = 100;
14const KCP_RTO_DEF: u32 = 200;
15const KCP_RTO_MAX: u32 = 60000;
16
17const KCP_CMD_PUSH: u8 = 81;
18const KCP_CMD_ACK: u8 = 82;
19const KCP_CMD_WASK: u8 = 83;
20const KCP_CMD_WINS: u8 = 84;
21
22const KCP_ASK_SEND: u32 = 1;
23const KCP_ASK_TELL: u32 = 2;
24
25const KCP_WND_SND: u16 = 32;
26const KCP_WND_RCV: u16 = 128;
27
28const KCP_MTU_DEF: usize = 1400;
29const KCP_INTERVAL: u32 = 100;
32const KCP_OVERHEAD: usize = 24;
33const KCP_THRESH_INIT: u16 = 2;
36const KCP_THRESH_MIN: u16 = 2;
37
38const KCP_PROBE_INIT: u32 = 7000;
39const KCP_PROBE_LIMIT: u32 = 120000;
40
41pub fn get_conv(mut buf: &[u8]) -> u32 {
43 assert!(buf.len() >= KCP_OVERHEAD);
44 buf.get_u32_le()
45}
46
47pub fn set_conv(mut buf: &mut [u8], conv: u32) {
49 assert!(buf.len() >= KCP_OVERHEAD);
50 buf.put_u32_le(conv)
51}
52
53#[inline]
54fn bound(lower: u32, v: u32, upper: u32) -> u32 {
55 cmp::min(cmp::max(lower, v), upper)
56}
57
58#[inline]
59fn timediff(later: u32, earlier: u32) -> i32 {
60 later as i32 - earlier as i32
61}
62
63#[derive(Default, Clone, Debug)]
64struct KcpSegment {
65 conv: u32,
66 cmd: u8,
67 frg: u8,
68 wnd: u16,
69 ts: u32,
70 sn: u32,
71 una: u32,
72 resendts: u32,
73 rto: u32,
74 fastack: u32,
75 xmit: u32,
76 data: BytesMut,
77}
78
79impl KcpSegment {
80 fn new_with_data(data: BytesMut) -> Self {
81 KcpSegment {
82 conv: 0,
83 cmd: 0,
84 frg: 0,
85 wnd: 0,
86 ts: 0,
87 sn: 0,
88 una: 0,
89 resendts: 0,
90 rto: 0,
91 fastack: 0,
92 xmit: 0,
93 data,
94 }
95 }
96
97 fn encode(&self, buf: &mut BytesMut) {
98 if buf.remaining_mut() < self.encoded_len() {
99 panic!(
100 "REMAIN {} encoded {} {:?}",
101 buf.remaining_mut(),
102 self.encoded_len(),
103 self
104 );
105 }
106
107 buf.put_u32_le(self.conv);
108 buf.put_u8(self.cmd);
109 buf.put_u8(self.frg);
110 buf.put_u16_le(self.wnd);
111 buf.put_u32_le(self.ts);
112 buf.put_u32_le(self.sn);
113 buf.put_u32_le(self.una);
114 buf.put_u32_le(self.data.len() as u32);
115 buf.put_slice(&self.data);
116 }
117
118 fn encoded_len(&self) -> usize {
119 KCP_OVERHEAD + self.data.len()
120 }
121}
122
123#[derive(Default)]
124struct KcpOutput<O: Write>(O);
125
126impl<O: Write> Write for KcpOutput<O> {
127 #[inline]
128 fn write(&mut self, data: &[u8]) -> io::Result<usize> {
129 trace!("[RO] {} bytes", data.len());
130 self.0.write(data)
131 }
132
133 #[inline]
134 fn flush(&mut self) -> io::Result<()> {
135 self.0.flush()
136 }
137}
138
139pub struct Kcp {
141 conv: u32,
143 mtu: usize,
145 mss: u32,
147 state: i32,
149
150 snd_una: u32,
152 snd_nxt: u32,
154 rcv_nxt: u32,
156
157 ssthresh: u16,
159
160 rx_rttval: u32,
162 rx_srtt: u32,
164 rx_rto: u32,
166 rx_minrto: u32,
168
169 snd_wnd: u16,
171 rcv_wnd: u16,
173 rmt_wnd: u16,
175 cwnd: u16,
177 probe: u32,
181
182 current: u32,
184 interval: u32,
186 ts_flush: u32,
188 xmit: u32,
189
190 nodelay: bool,
192 updated: bool,
194
195 ts_probe: u32,
197 probe_wait: u32,
199
200 dead_link: u32,
202 incr: u32,
204
205 snd_queue: VecDeque<KcpSegment>,
206 rcv_queue: VecDeque<KcpSegment>,
207 snd_buf: VecDeque<KcpSegment>,
208 rcv_buf: VecDeque<KcpSegment>,
209
210 acklist: VecDeque<(u32, u32)>,
212 buf: BytesMut,
213
214 fastresend: u32,
216 nocwnd: bool,
218 stream: bool,
220
221 input_conv: bool,
223
224 output: fn (data:&[u8])->std::io::Result<()>,
225}
226
227impl Kcp {
228 pub fn new(conv: u32, output: fn (data:&[u8])->std::io::Result<()>) -> Self {
233 Kcp::construct(conv, output, false)
234 }
235
236 pub fn new_stream(conv: u32, output: fn (data:&[u8])->std::io::Result<()>) -> Self {
241 Kcp::construct(conv, output, true)
242 }
243
244 fn construct(conv: u32, output: fn (data:&[u8])->std::io::Result<()>, stream: bool) -> Self {
245 Kcp {
246 conv,
247 snd_una: 0,
248 snd_nxt: 0,
249 rcv_nxt: 0,
250 rx_rttval: 0,
251 rx_srtt: 0,
252 state: 0,
253 cwnd: 0,
254 probe: 0,
255 current: 0,
256 xmit: 0,
257 nodelay: false,
258 updated: false,
259 ts_probe: 0,
260 probe_wait: 0,
261 dead_link: 10,
262 incr: 0,
263 fastresend: 0,
264 nocwnd: false,
265 stream,
266 snd_wnd: KCP_WND_SND,
267 rcv_wnd: KCP_WND_RCV,
268 rmt_wnd: KCP_WND_RCV,
269 mtu: KCP_MTU_DEF,
270 mss: (KCP_MTU_DEF - KCP_OVERHEAD) as u32,
271 buf: BytesMut::with_capacity((KCP_MTU_DEF + KCP_OVERHEAD) * 3),
272 snd_queue: VecDeque::new(),
273 rcv_queue: VecDeque::new(),
274 snd_buf: VecDeque::new(),
275 rcv_buf: VecDeque::new(),
276 acklist: VecDeque::new(),
277 rx_rto: KCP_RTO_DEF,
278 rx_minrto: KCP_RTO_MIN,
279 interval: KCP_INTERVAL,
280 ts_flush: KCP_INTERVAL,
281 ssthresh: KCP_THRESH_INIT,
282 input_conv: false,
283 output: output
284 }
285 }
286
287 pub fn peeksize(&self) -> KcpResult<usize> {
289 match self.rcv_queue.front() {
290 Some(segment) => {
291 if segment.frg == 0 {
292 return Ok(segment.data.len());
293 }
294
295 if self.rcv_queue.len() < (segment.frg + 1) as usize {
296 return Err(Error::ExpectingFragment);
297 }
298
299 let mut len = 0;
300
301 for segment in &self.rcv_queue {
302 len += segment.data.len();
303 if segment.frg == 0 {
304 break;
305 }
306 }
307
308 Ok(len)
309 }
310 None => Err(Error::RecvQueueEmpty),
311 }
312 }
313
314 pub fn move_buf(&mut self) {
316 while !self.rcv_buf.is_empty() {
317 let nrcv_que = self.rcv_queue.len();
318 {
319 let seg = &self.rcv_buf[0];
320 if seg.sn == self.rcv_nxt && nrcv_que < self.rcv_wnd as usize {
321 self.rcv_nxt += 1;
322 } else {
323 break;
324 }
325 }
326
327 let seg = self.rcv_buf.pop_front().unwrap();
328 self.rcv_queue.push_back(seg);
329 }
330 }
331
332 pub fn recv(&mut self, buf: &mut [u8]) -> KcpResult<usize> {
334 if self.rcv_queue.is_empty() {
335 return Err(Error::RecvQueueEmpty);
336 }
337
338 let peeksize = self.peeksize()?;
339
340 if peeksize > buf.len() {
341 debug!("recv peeksize={} bufsize={} too small", peeksize, buf.len());
342 return Err(Error::UserBufTooSmall);
343 }
344
345 let recover = self.rcv_queue.len() >= self.rcv_wnd as usize;
346
347 let mut cur = Cursor::new(buf);
349 while let Some(seg) = self.rcv_queue.pop_front() {
350 cur.write_all(&seg.data)?;
351
352 trace!("recv sn={}", seg.sn);
353
354 if seg.frg == 0 {
355 break;
356 }
357 }
358 assert_eq!(cur.position() as usize, peeksize);
359
360 self.move_buf();
361
362 if self.rcv_queue.len() < self.rcv_wnd as usize && recover {
364 self.probe |= KCP_ASK_TELL;
367 }
368
369 Ok(cur.position() as usize)
370 }
371
372 pub fn send(&mut self, mut buf: &[u8]) -> KcpResult<usize> {
374 let mut sent_size = 0;
375
376 assert!(self.mss > 0);
377
378 if self.stream {
380 if let Some(old) = self.snd_queue.back_mut() {
381 let l = old.data.len();
382 if l < self.mss as usize {
383 let capacity = self.mss as usize - l;
384 let extend = cmp::min(buf.len(), capacity);
385
386 trace!(
387 "send stream mss={} last length={} extend={}",
388 self.mss,
389 l,
390 extend
391 );
392
393 let (lf, rt) = buf.split_at(extend);
394 old.data.extend_from_slice(lf);
395 buf = rt;
396
397 old.frg = 0;
398 sent_size += extend;
399 }
400
401 if buf.is_empty() {
402 return Ok(sent_size);
403 }
404 }
405 }
406
407 let count = if buf.len() <= self.mss as usize {
408 1
409 } else {
410 (buf.len() + self.mss as usize - 1) / self.mss as usize
411 };
412
413 if count >= KCP_WND_RCV as usize {
414 debug!("send bufsize={} mss={} too large", buf.len(), self.mss);
415 return Err(Error::UserBufTooBig);
416 }
417 assert!(count > 0);
418
419 for i in 0..count {
422 let size = cmp::min(self.mss as usize, buf.len());
423
424 let (lf, rt) = buf.split_at(size);
425
426 let mut new_segment = KcpSegment::new_with_data(lf.into());
427 buf = rt;
428
429 new_segment.frg = if self.stream {
430 0
431 } else {
432 (count - i - 1) as u8
433 };
434
435 self.snd_queue.push_back(new_segment);
436 sent_size += size;
437 }
438
439 Ok(sent_size)
440 }
441
442 fn update_ack(&mut self, rtt: u32) {
443 if self.rx_srtt == 0 {
444 self.rx_srtt = rtt;
445 self.rx_rttval = rtt / 2;
446 } else {
447 let delta = if rtt > self.rx_srtt {
448 rtt - self.rx_srtt
449 } else {
450 self.rx_srtt - rtt
451 };
452 self.rx_rttval = (3 * self.rx_rttval + delta) / 4;
453 self.rx_srtt = (7 * self.rx_srtt + rtt) / 8;
454 if self.rx_srtt < 1 {
455 self.rx_srtt = 1;
456 }
457 }
458 let rto = self.rx_srtt + cmp::max(self.interval, 4 * self.rx_rttval);
459 self.rx_rto = bound(self.rx_minrto, rto, KCP_RTO_MAX);
460 }
461
462 #[inline]
463 fn shrink_buf(&mut self) {
464 self.snd_una = match self.snd_buf.front() {
465 Some(seg) => seg.sn,
466 None => self.snd_nxt,
467 };
468 }
469
470 fn parse_ack(&mut self, sn: u32) {
471 if timediff(sn, self.snd_una) < 0 || timediff(sn, self.snd_nxt) >= 0 {
472 return;
473 }
474
475 for i in 0..self.snd_buf.len() {
476 match sn.cmp(&self.snd_buf[i].sn) {
477 Ordering::Equal => {
478 self.snd_buf.remove(i);
479 }
480 Ordering::Less => break,
481 _ => (),
482 }
483 }
484 }
485
486 fn parse_una(&mut self, una: u32) {
487 while !self.snd_buf.is_empty() {
488 if timediff(una, self.snd_buf[0].sn) > 0 {
489 self.snd_buf.pop_front();
491 } else {
492 break;
493 }
494 }
495 }
496
497 fn parse_fastack(&mut self, sn: u32) {
498 if timediff(sn, self.snd_una) < 0 || timediff(sn, self.snd_nxt) >= 0 {
499 return;
500 }
501
502 for seg in &mut self.snd_buf {
503 if timediff(sn, seg.sn) < 0 {
504 break;
505 } else if sn != seg.sn {
506 seg.fastack += 1;
507 }
508 }
509 }
510
511 #[inline]
512 fn ack_push(&mut self, sn: u32, ts: u32) {
513 self.acklist.push_back((sn, ts));
514 }
515
516 fn parse_data(&mut self, new_segment: KcpSegment) {
517 let sn = new_segment.sn;
518
519 if timediff(sn, self.rcv_nxt + self.rcv_wnd as u32) >= 0 || timediff(sn, self.rcv_nxt) < 0 {
520 return;
521 }
522
523 let mut repeat = false;
524 let mut new_index = self.rcv_buf.len();
525
526 for segment in self.rcv_buf.iter().rev() {
527 if segment.sn == sn {
528 repeat = true;
529 break;
530 } else if timediff(sn, segment.sn) > 0 {
531 break;
532 }
533 new_index -= 1;
534 }
535
536 if !repeat {
537 self.rcv_buf.insert(new_index, new_segment);
538 }
539
540 self.move_buf();
542 }
543
544 #[inline]
546 pub fn input_conv(&mut self) {
547 self.input_conv = true;
548 }
549
550 #[inline]
552 pub fn waiting_conv(&self) -> bool {
553 self.input_conv
554 }
555
556 #[inline]
558 pub fn set_conv(&mut self, conv: u32) {
559 self.conv = conv;
560 }
561
562 #[inline]
564 pub fn conv(&self) -> u32 {
565 self.conv
566 }
567
568 pub fn input(&mut self, buf: &[u8]) -> KcpResult<usize> {
570 let input_size = buf.len();
571
572 trace!("[RI] {} bytes", buf.len());
573
574 if buf.len() < KCP_OVERHEAD {
575 debug!(
576 "input bufsize={} too small, at least {}",
577 buf.len(),
578 KCP_OVERHEAD
579 );
580 return Err(Error::InvalidSegmentSize(buf.len()));
581 }
582
583 let mut flag = false;
584 let mut max_ack = 0;
585 let old_una = self.snd_una;
586
587 let mut buf = Cursor::new(buf);
588 while buf.remaining() >= KCP_OVERHEAD as usize {
589 let conv = buf.get_u32_le();
590 if conv != self.conv {
591 if self.input_conv {
594 debug!("input conv={} updated, original conv={}", conv, self.conv);
595 self.conv = conv;
596 self.input_conv = false;
597 } else {
598 debug!("input conv={} expected conv={} not match", conv, self.conv);
599 return Err(Error::ConvInconsistent(self.conv, conv));
600 }
601 }
602
603 let cmd = buf.get_u8();
604 let frg = buf.get_u8();
605 let wnd = buf.get_u16_le();
606 let ts = buf.get_u32_le();
607 let sn = buf.get_u32_le();
608 let una = buf.get_u32_le();
609 let len = buf.get_u32_le() as usize;
610
611 if buf.remaining() < len as usize {
612 debug!(
613 "input bufsize={} payload length={} remaining={} not match",
614 input_size,
615 len,
616 buf.remaining()
617 );
618 return Err(Error::InvalidSegmentDataSize(len, buf.remaining()));
619 }
620
621 match cmd {
622 KCP_CMD_PUSH | KCP_CMD_ACK | KCP_CMD_WASK | KCP_CMD_WINS => {}
623 _ => {
624 debug!("input cmd={} unrecognized", cmd);
625 return Err(Error::UnsupportedCmd(cmd));
626 }
627 }
628
629 self.rmt_wnd = wnd;
630
631 self.parse_una(una);
632 self.shrink_buf();
633
634 let mut has_read_data = false;
635
636 match cmd {
637 KCP_CMD_ACK => {
638 let rtt = timediff(self.current, ts);
639 if rtt >= 0 {
640 self.update_ack(rtt as u32);
641 }
642 self.parse_ack(sn);
643 self.shrink_buf();
644
645 if !flag {
646 max_ack = sn;
647 flag = true;
648 } else if timediff(sn, max_ack) > 0 {
649 max_ack = sn;
650 }
651
652 trace!(
653 "input ack: sn={} rtt={} rto={}",
654 sn,
655 timediff(self.current, ts),
656 self.rx_rto
657 );
658 }
659 KCP_CMD_PUSH => {
660 trace!("input psh: sn={} ts={}", sn, ts);
661
662 if timediff(sn, self.rcv_nxt + self.rcv_wnd as u32) < 0 {
663 self.ack_push(sn, ts);
664 if timediff(sn, self.rcv_nxt) >= 0 {
665 let mut sbuf = BytesMut::with_capacity(len as usize);
666 unsafe {
667 sbuf.set_len(len as usize);
668 }
669 buf.read_exact(&mut sbuf).unwrap();
670 has_read_data = true;
671
672 let mut segment = KcpSegment::new_with_data(sbuf);
673
674 segment.conv = conv;
675 segment.cmd = cmd;
676 segment.frg = frg;
677 segment.wnd = wnd;
678 segment.ts = ts;
679 segment.sn = sn;
680 segment.una = una;
681
682 self.parse_data(segment);
683 }
684 }
685 }
686 KCP_CMD_WASK => {
687 trace!("input probe");
688 self.probe |= KCP_ASK_TELL;
689 }
690 KCP_CMD_WINS => {
691 trace!("input wins: {}", wnd);
693 }
694 _ => unreachable!(),
695 }
696
697 if !has_read_data {
699 let next_pos = buf.position() + len as u64;
700 buf.set_position(next_pos);
701 }
702 }
703
704 if flag {
705 self.parse_fastack(max_ack);
706 }
707
708 if self.snd_una > old_una && self.cwnd < self.rmt_wnd {
709 let mss = self.mss;
710 if self.cwnd < self.ssthresh {
711 self.cwnd += 1;
712 self.incr += mss;
713 } else {
714 if self.incr < mss {
715 self.incr = mss;
716 }
717 self.incr += (mss * mss) / self.incr + (mss / 16);
718 if (self.cwnd + 1) as u32 * mss <= self.incr {
719 self.cwnd += 1;
720 }
721 }
722 if self.cwnd > self.rmt_wnd {
723 self.cwnd = self.rmt_wnd;
724 self.incr = self.rmt_wnd as u32 * mss;
725 }
726 }
727
728 Ok(buf.position() as usize)
729 }
730
731 fn wnd_unused(&self) -> u16 {
732 if self.rcv_queue.len() < self.rcv_wnd as usize {
733 self.rcv_wnd - self.rcv_queue.len() as u16
734 } else {
735 0
736 }
737 }
738
739 fn _flush_ack(&mut self, segment: &mut KcpSegment) -> KcpResult<()> {
740 for &(sn, ts) in &self.acklist {
743 if self.buf.len() + KCP_OVERHEAD > self.mtu as usize {
744 (self.output)(&self.buf)?;
745 self.buf.clear();
746 }
747 segment.sn = sn;
748 segment.ts = ts;
749 segment.encode(&mut self.buf);
750 }
751 self.acklist.clear();
752
753 Ok(())
754 }
755
756 fn probe_wnd_size(&mut self) {
757 if self.rmt_wnd == 0 {
759 if self.probe_wait == 0 {
760 self.probe_wait = KCP_PROBE_INIT;
761 self.ts_probe = self.current + self.probe_wait;
762 } else {
763 if timediff(self.current, self.ts_probe) >= 0 && self.probe_wait < KCP_PROBE_INIT {
764 self.probe_wait = KCP_PROBE_INIT;
765 }
766 self.probe_wait += self.probe_wait / 2;
767 if self.probe_wait > KCP_PROBE_LIMIT {
768 self.probe_wait = KCP_PROBE_LIMIT;
769 }
770 self.ts_probe = self.current + self.probe_wait;
771 self.probe |= KCP_ASK_SEND;
772 }
773 } else {
774 self.ts_probe = 0;
775 self.probe_wait = 0;
776 }
777 }
778
779 fn _flush_probe_commands(&mut self, cmd: u8, segment: &mut KcpSegment) -> KcpResult<()> {
780 segment.cmd = cmd;
781 if self.buf.len() + KCP_OVERHEAD > self.mtu as usize {
782 (self.output)(&self.buf)?;
783 self.buf.clear();
784 }
785 segment.encode(&mut self.buf);
786 Ok(())
787 }
788
789 fn flush_probe_commands(&mut self, segment: &mut KcpSegment) -> KcpResult<()> {
790 if (self.probe & KCP_ASK_SEND) != 0 {
792 self._flush_probe_commands(KCP_CMD_WASK, segment)?;
793 }
794
795 if (self.probe & KCP_ASK_TELL) != 0 {
797 self._flush_probe_commands(KCP_CMD_WINS, segment)?;
798 }
799 self.probe = 0;
800 Ok(())
801 }
802
803 pub fn flush_ack(&mut self) -> KcpResult<()> {
805 if !self.updated {
806 debug!("flush updated() must be called at least once");
807 return Err(Error::NeedUpdate);
808 }
809
810 let mut segment = KcpSegment {
811 conv: self.conv,
812 cmd: KCP_CMD_ACK,
813 wnd: self.wnd_unused(),
814 una: self.rcv_nxt,
815 ..Default::default()
816 };
817
818 self._flush_ack(&mut segment)
819 }
820
821 pub fn flush(&mut self) -> KcpResult<()> {
823 if !self.updated {
824 debug!("flush updated() must be called at least once");
825 return Err(Error::NeedUpdate);
826 }
827
828 let mut segment = KcpSegment {
829 conv: self.conv,
830 cmd: KCP_CMD_ACK,
831 wnd: self.wnd_unused(),
832 una: self.rcv_nxt,
833 ..Default::default()
834 };
835
836 self._flush_ack(&mut segment)?;
837 self.probe_wnd_size();
838 self.flush_probe_commands(&mut segment)?;
839
840 let mut cwnd = cmp::min(self.snd_wnd, self.rmt_wnd);
844 if !self.nocwnd {
845 cwnd = cmp::min(self.cwnd, cwnd);
846 }
847
848 while timediff(self.snd_nxt, self.snd_una + cwnd as u32) < 0 {
850 match self.snd_queue.pop_front() {
851 Some(mut new_segment) => {
852 new_segment.conv = self.conv;
853 new_segment.cmd = KCP_CMD_PUSH;
854 new_segment.wnd = segment.wnd;
855 new_segment.ts = self.current;
856 new_segment.sn = self.snd_nxt;
857 self.snd_nxt += 1;
858 new_segment.una = self.rcv_nxt;
859 new_segment.resendts = self.current;
860 new_segment.rto = self.rx_rto;
861 new_segment.fastack = 0;
862 new_segment.xmit = 0;
863 self.snd_buf.push_back(new_segment);
864 }
865 None => break,
866 }
867 }
868
869 let resent = if self.fastresend > 0 {
871 self.fastresend
872 } else {
873 u32::max_value()
874 };
875
876 let rtomin = if !self.nodelay { self.rx_rto >> 3 } else { 0 };
877
878 let mut lost = false;
879 let mut change = 0;
880
881 for snd_segment in &mut self.snd_buf {
882 let mut need_send = false;
883
884 if snd_segment.xmit == 0 {
885 need_send = true;
886 snd_segment.xmit += 1;
887 snd_segment.rto = self.rx_rto;
888 snd_segment.resendts = self.current + snd_segment.rto + rtomin;
889 } else if timediff(self.current, snd_segment.resendts) >= 0 {
890 need_send = true;
891 snd_segment.xmit += 1;
892 self.xmit += 1;
893 if !self.nodelay {
894 snd_segment.rto += self.rx_rto;
895 } else {
896 snd_segment.rto += self.rx_rto / 2;
897 }
898 snd_segment.resendts = self.current + snd_segment.rto;
899 lost = true;
900 } else if snd_segment.fastack >= resent {
901 need_send = true;
902 snd_segment.xmit += 1;
903 snd_segment.fastack = 0;
904 snd_segment.resendts = self.current + snd_segment.rto;
905 change += 1;
906 }
907
908 if need_send {
909 snd_segment.ts = self.current;
910 snd_segment.wnd = segment.wnd;
911 snd_segment.una = self.rcv_nxt;
912
913 let need = KCP_OVERHEAD + snd_segment.data.len();
914
915 if self.buf.len() + need > self.mtu as usize {
916 (self.output)(&self.buf)?;
917 self.buf.clear();
918 }
919
920 snd_segment.encode(&mut self.buf);
921
922 if snd_segment.xmit >= self.dead_link {
923 self.state = -1;
924 }
925 }
926 }
927
928 if !self.buf.is_empty() {
930 (self.output)(&self.buf)?;
931 self.buf.clear();
932 }
933
934 if change > 0 {
936 let inflight = self.snd_nxt - self.snd_una;
937 self.ssthresh = inflight as u16 / 2;
938 if self.ssthresh < KCP_THRESH_MIN {
939 self.ssthresh = KCP_THRESH_MIN;
940 }
941 self.cwnd = self.ssthresh + resent as u16;
942 self.incr = self.cwnd as u32 * self.mss;
943 }
944
945 if lost {
946 self.ssthresh = cwnd / 2;
947 if self.ssthresh < KCP_THRESH_MIN {
948 self.ssthresh = KCP_THRESH_MIN;
949 }
950 self.cwnd = 1;
951 self.incr = self.mss;
952 }
953
954 if self.cwnd < 1 {
955 self.cwnd = 1;
956 self.incr = self.mss;
957 }
958
959 Ok(())
960 }
961
962 pub fn update(&mut self, current: u32) -> KcpResult<()> {
966 self.current = current;
967
968 if !self.updated {
969 self.updated = true;
970 self.ts_flush = self.current;
971 }
972
973 let mut slap = timediff(self.current, self.ts_flush);
974
975 if slap >= 10000 || slap < -10000 {
976 self.ts_flush = self.current;
977 slap = 0;
978 }
979
980 if slap >= 0 {
981 self.ts_flush += self.interval;
982 if timediff(self.current, self.ts_flush) >= 0 {
983 self.ts_flush = self.current + self.interval;
984 }
985 self.flush()?;
986 }
987
988 Ok(())
989 }
990
991 pub fn check(&self, current: u32) -> u32 {
995 if !self.updated {
996 return 0;
997 }
998
999 let mut ts_flush = self.ts_flush;
1000 let mut tm_packet = u32::max_value();
1001
1002 if timediff(current, ts_flush) >= 10000 || timediff(current, ts_flush) < -10000 {
1003 ts_flush = current;
1004 }
1005
1006 if timediff(current, ts_flush) >= 0 {
1007 return 0;
1009 }
1010
1011 let tm_flush = timediff(ts_flush, current) as u32;
1012 for seg in &self.snd_buf {
1013 let diff = timediff(seg.resendts, current);
1014 if diff <= 0 {
1015 return 0;
1017 }
1018 if (diff as u32) < tm_packet {
1019 tm_packet = diff as u32;
1020 }
1021 }
1022
1023 cmp::min(cmp::min(tm_packet, tm_flush), self.interval)
1024 }
1025
1026 pub fn set_mtu(&mut self, mtu: usize) -> KcpResult<()> {
1030 if mtu < 50 || mtu < KCP_OVERHEAD {
1031 debug!("set_mtu mtu={} invalid", mtu);
1032 return Err(Error::InvalidMtu(mtu));
1033 }
1034
1035 self.mtu = mtu;
1036 self.mss = (self.mtu - KCP_OVERHEAD) as u32;
1037
1038 let additional = ((mtu + KCP_OVERHEAD) * 3) as isize - self.buf.capacity() as isize;
1039 if additional > 0 {
1040 self.buf.reserve(additional as usize);
1041 }
1042
1043 Ok(())
1044 }
1045
1046 pub fn mtu(&self) -> usize {
1048 self.mtu
1049 }
1050
1051 pub fn set_interval(&mut self, mut interval: u32) {
1053 if interval > 5000 {
1054 interval = 5000;
1055 } else if interval < 10 {
1056 interval = 10;
1057 }
1058 self.interval = interval;
1059 }
1060
1061 pub fn set_nodelay(&mut self, nodelay: bool, mut interval: i32, resend: i32, nc: bool) {
1070 if nodelay {
1071 self.nodelay = true;
1072 self.rx_minrto = KCP_RTO_NDL;
1073 } else {
1074 self.nodelay = false;
1075 self.rx_minrto = KCP_RTO_MIN;
1076 }
1077
1078 if interval >= 0 {
1079 if interval > 5000 {
1080 interval = 5000;
1081 } else if interval < 10 {
1082 interval = 10;
1083 }
1084
1085 self.interval = interval as u32;
1086 }
1087
1088 if resend >= 0 {
1089 self.fastresend = resend as u32;
1090 }
1091
1092 self.nocwnd = nc;
1093 }
1094
1095 pub fn set_wndsize(&mut self, sndwnd: u16, rcvwnd: u16) {
1098 if sndwnd > 0 {
1099 self.snd_wnd = sndwnd as u16;
1100 }
1101
1102 if rcvwnd > 0 {
1103 self.rcv_wnd = cmp::max(rcvwnd, KCP_WND_RCV) as u16;
1104 }
1105 }
1106
1107 pub fn snd_wnd(&self) -> u16 {
1109 self.snd_wnd
1110 }
1111
1112 pub fn rcv_wnd(&self) -> u16 {
1114 self.rcv_wnd
1115 }
1116
1117 pub fn wait_snd(&self) -> usize {
1119 self.snd_buf.len() + self.snd_queue.len()
1120 }
1121
1122 pub fn set_rx_minrto(&mut self, rto: u32) {
1124 self.rx_minrto = rto;
1125 }
1126
1127 pub fn set_fast_resend(&mut self, fr: u32) {
1129 self.fastresend = fr;
1130 }
1131
1132 pub fn header_len() -> usize {
1134 KCP_OVERHEAD as usize
1135 }
1136
1137 pub fn is_stream(&self) -> bool {
1139 self.stream
1140 }
1141
1142 pub fn mss(&self) -> u32 {
1144 self.mss
1145 }
1146
1147 pub fn set_maximum_resend_times(&mut self, dead_link: u32) {
1149 self.dead_link = dead_link;
1150 }
1151
1152 pub fn is_dead_link(&self) -> bool {
1154 self.state != 0
1155 }
1156}