kcprs/
kcp.rs

1//! KCP
2
3use std::cmp;
4use std::cmp::Ordering;
5use std::collections::VecDeque;
6use std::io::{self, Cursor, Read, Write};
7
8use bytes::{Buf, BufMut, BytesMut};
9use error::Error;
10use KcpResult;
11
12const KCP_RTO_NDL: u32 = 30;
13const KCP_RTO_MIN: u32 = 100;
14const KCP_RTO_DEF: u32 = 200;
15const KCP_RTO_MAX: u32 = 60000;
16
17const KCP_CMD_PUSH: u8 = 81;
18const KCP_CMD_ACK: u8 = 82;
19const KCP_CMD_WASK: u8 = 83;
20const KCP_CMD_WINS: u8 = 84;
21
22const KCP_ASK_SEND: u32 = 1;
23const KCP_ASK_TELL: u32 = 2;
24
25const KCP_WND_SND: u16 = 32;
26const KCP_WND_RCV: u16 = 128;
27
28const KCP_MTU_DEF: usize = 1400;
29// const KCP_ACK_FAST: u32 = 3;
30
31const KCP_INTERVAL: u32 = 100;
32const KCP_OVERHEAD: usize = 24;
33// const KCP_DEADLINK: u32 = 20;
34
35const KCP_THRESH_INIT: u16 = 2;
36const KCP_THRESH_MIN: u16 = 2;
37
38const KCP_PROBE_INIT: u32 = 7000;
39const KCP_PROBE_LIMIT: u32 = 120000;
40
41/// Read `conv` from raw buffer
42pub fn get_conv(mut buf: &[u8]) -> u32 {
43    assert!(buf.len() >= KCP_OVERHEAD);
44    buf.get_u32_le()
45}
46
47/// Set `conv` to raw buffer
48pub fn set_conv(mut buf: &mut [u8], conv: u32) {
49    assert!(buf.len() >= KCP_OVERHEAD);
50    buf.put_u32_le(conv)
51}
52
53#[inline]
54fn bound(lower: u32, v: u32, upper: u32) -> u32 {
55    cmp::min(cmp::max(lower, v), upper)
56}
57
58#[inline]
59fn timediff(later: u32, earlier: u32) -> i32 {
60    later as i32 - earlier as i32
61}
62
63#[derive(Default, Clone, Debug)]
64struct KcpSegment {
65    conv: u32,
66    cmd: u8,
67    frg: u8,
68    wnd: u16,
69    ts: u32,
70    sn: u32,
71    una: u32,
72    resendts: u32,
73    rto: u32,
74    fastack: u32,
75    xmit: u32,
76    data: BytesMut,
77}
78
79impl KcpSegment {
80    fn new_with_data(data: BytesMut) -> Self {
81        KcpSegment {
82            conv: 0,
83            cmd: 0,
84            frg: 0,
85            wnd: 0,
86            ts: 0,
87            sn: 0,
88            una: 0,
89            resendts: 0,
90            rto: 0,
91            fastack: 0,
92            xmit: 0,
93            data,
94        }
95    }
96
97    fn encode(&self, buf: &mut BytesMut) {
98        if buf.remaining_mut() < self.encoded_len() {
99            panic!(
100                "REMAIN {} encoded {} {:?}",
101                buf.remaining_mut(),
102                self.encoded_len(),
103                self
104            );
105        }
106
107        buf.put_u32_le(self.conv);
108        buf.put_u8(self.cmd);
109        buf.put_u8(self.frg);
110        buf.put_u16_le(self.wnd);
111        buf.put_u32_le(self.ts);
112        buf.put_u32_le(self.sn);
113        buf.put_u32_le(self.una);
114        buf.put_u32_le(self.data.len() as u32);
115        buf.put_slice(&self.data);
116    }
117
118    fn encoded_len(&self) -> usize {
119        KCP_OVERHEAD + self.data.len()
120    }
121}
122
123#[derive(Default)]
124struct KcpOutput<O: Write>(O);
125
126impl<O: Write> Write for KcpOutput<O> {
127    #[inline]
128    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
129        trace!("[RO] {} bytes", data.len());
130        self.0.write(data)
131    }
132
133    #[inline]
134    fn flush(&mut self) -> io::Result<()> {
135        self.0.flush()
136    }
137}
138
139/// KCP control
140pub struct Kcp {
141    /// Conversation ID
142    conv: u32,
143    /// Maximun Transmission Unit
144    mtu: usize,
145    /// Maximum Segment Size
146    mss: u32,
147    /// Connection state
148    state: i32,
149
150    /// First unacknowledged packet
151    snd_una: u32,
152    /// Next packet
153    snd_nxt: u32,
154    /// Next packet to be received
155    rcv_nxt: u32,
156
157    /// Congetion window threshole
158    ssthresh: u16,
159
160    /// ACK receive variable RTT
161    rx_rttval: u32,
162    /// ACK receive static RTT
163    rx_srtt: u32,
164    /// Resend time (calculated by ACK delay time)
165    rx_rto: u32,
166    /// Minimal resend timeout
167    rx_minrto: u32,
168
169    /// Send window
170    snd_wnd: u16,
171    /// Receive window
172    rcv_wnd: u16,
173    /// Remote receive window
174    rmt_wnd: u16,
175    /// Congetion window
176    cwnd: u16,
177    /// Check window
178    /// - IKCP_ASK_TELL, telling window size to remote
179    /// - IKCP_ASK_SEND, ask remote for window size
180    probe: u32,
181
182    /// Last update time
183    current: u32,
184    /// Flush interval
185    interval: u32,
186    /// Next flush interval
187    ts_flush: u32,
188    xmit: u32,
189
190    /// Enable nodelay
191    nodelay: bool,
192    /// Updated has been called or not
193    updated: bool,
194
195    /// Next check window timestamp
196    ts_probe: u32,
197    /// Check window wait time
198    probe_wait: u32,
199
200    /// Maximum resend time
201    dead_link: u32,
202    /// Maximum payload size
203    incr: u32,
204
205    snd_queue: VecDeque<KcpSegment>,
206    rcv_queue: VecDeque<KcpSegment>,
207    snd_buf: VecDeque<KcpSegment>,
208    rcv_buf: VecDeque<KcpSegment>,
209
210    /// Pending ACK
211    acklist: VecDeque<(u32, u32)>,
212    buf: BytesMut,
213
214    /// ACK number to trigger fast resend
215    fastresend: u32,
216    /// Disable congetion control
217    nocwnd: bool,
218    /// Enable stream mode
219    stream: bool,
220
221    /// Get conv from the next input call
222    input_conv: bool,
223
224    output: fn (data:&[u8])->std::io::Result<()>,
225}
226
227impl Kcp {
228    /// Creates a KCP control object, `conv` must be equal in both endpoints in one connection.
229    /// `output` is the callback object for writing.
230    ///
231    /// `conv` represents conversation.
232    pub fn new(conv: u32, output: fn (data:&[u8])->std::io::Result<()>) -> Self {
233        Kcp::construct(conv, output, false)
234    }
235
236    /// Creates a KCP control object in stream mode, `conv` must be equal in both endpoints in one connection.
237    /// `output` is the callback object for writing.
238    ///
239    /// `conv` represents conversation.
240    pub fn new_stream(conv: u32, output: fn (data:&[u8])->std::io::Result<()>) -> Self {
241        Kcp::construct(conv, output, true)
242    }
243
244    fn construct(conv: u32, output: fn (data:&[u8])->std::io::Result<()>, stream: bool) -> Self {
245        Kcp {
246            conv,
247            snd_una: 0,
248            snd_nxt: 0,
249            rcv_nxt: 0,
250            rx_rttval: 0,
251            rx_srtt: 0,
252            state: 0,
253            cwnd: 0,
254            probe: 0,
255            current: 0,
256            xmit: 0,
257            nodelay: false,
258            updated: false,
259            ts_probe: 0,
260            probe_wait: 0,
261            dead_link: 10,
262            incr: 0,
263            fastresend: 0,
264            nocwnd: false,
265            stream,
266            snd_wnd: KCP_WND_SND,
267            rcv_wnd: KCP_WND_RCV,
268            rmt_wnd: KCP_WND_RCV,
269            mtu: KCP_MTU_DEF,
270            mss: (KCP_MTU_DEF - KCP_OVERHEAD) as u32,
271            buf: BytesMut::with_capacity((KCP_MTU_DEF + KCP_OVERHEAD) * 3),
272            snd_queue: VecDeque::new(),
273            rcv_queue: VecDeque::new(),
274            snd_buf: VecDeque::new(),
275            rcv_buf: VecDeque::new(),
276            acklist: VecDeque::new(),
277            rx_rto: KCP_RTO_DEF,
278            rx_minrto: KCP_RTO_MIN,
279            interval: KCP_INTERVAL,
280            ts_flush: KCP_INTERVAL,
281            ssthresh: KCP_THRESH_INIT,
282            input_conv: false,
283            output: output
284        }
285    }
286
287    /// Check buffer size without actually consuming it
288    pub fn peeksize(&self) -> KcpResult<usize> {
289        match self.rcv_queue.front() {
290            Some(segment) => {
291                if segment.frg == 0 {
292                    return Ok(segment.data.len());
293                }
294
295                if self.rcv_queue.len() < (segment.frg + 1) as usize {
296                    return Err(Error::ExpectingFragment);
297                }
298
299                let mut len = 0;
300
301                for segment in &self.rcv_queue {
302                    len += segment.data.len();
303                    if segment.frg == 0 {
304                        break;
305                    }
306                }
307
308                Ok(len)
309            }
310            None => Err(Error::RecvQueueEmpty),
311        }
312    }
313
314    // move available data from rcv_buf -> rcv_queue
315    pub fn move_buf(&mut self) {
316        while !self.rcv_buf.is_empty() {
317            let nrcv_que = self.rcv_queue.len();
318            {
319                let seg = &self.rcv_buf[0];
320                if seg.sn == self.rcv_nxt && nrcv_que < self.rcv_wnd as usize {
321                    self.rcv_nxt += 1;
322                } else {
323                    break;
324                }
325            }
326
327            let seg = self.rcv_buf.pop_front().unwrap();
328            self.rcv_queue.push_back(seg);
329        }
330    }
331
332    /// Receive data from buffer
333    pub fn recv(&mut self, buf: &mut [u8]) -> KcpResult<usize> {
334        if self.rcv_queue.is_empty() {
335            return Err(Error::RecvQueueEmpty);
336        }
337
338        let peeksize = self.peeksize()?;
339
340        if peeksize > buf.len() {
341            debug!("recv peeksize={} bufsize={} too small", peeksize, buf.len());
342            return Err(Error::UserBufTooSmall);
343        }
344
345        let recover = self.rcv_queue.len() >= self.rcv_wnd as usize;
346
347        // Merge fragment
348        let mut cur = Cursor::new(buf);
349        while let Some(seg) = self.rcv_queue.pop_front() {
350            cur.write_all(&seg.data)?;
351
352            trace!("recv sn={}", seg.sn);
353
354            if seg.frg == 0 {
355                break;
356            }
357        }
358        assert_eq!(cur.position() as usize, peeksize);
359
360        self.move_buf();
361
362        // fast recover
363        if self.rcv_queue.len() < self.rcv_wnd as usize && recover {
364            // ready to send back IKCP_CMD_WINS in ikcp_flush
365            // tell remote my window size
366            self.probe |= KCP_ASK_TELL;
367        }
368
369        Ok(cur.position() as usize)
370    }
371
372    /// Send bytes into buffer
373    pub fn send(&mut self, mut buf: &[u8]) -> KcpResult<usize> {
374        let mut sent_size = 0;
375
376        assert!(self.mss > 0);
377
378        // append to previous segment in streaming mode (if possible)
379        if self.stream {
380            if let Some(old) = self.snd_queue.back_mut() {
381                let l = old.data.len();
382                if l < self.mss as usize {
383                    let capacity = self.mss as usize - l;
384                    let extend = cmp::min(buf.len(), capacity);
385
386                    trace!(
387                        "send stream mss={} last length={} extend={}",
388                        self.mss,
389                        l,
390                        extend
391                    );
392
393                    let (lf, rt) = buf.split_at(extend);
394                    old.data.extend_from_slice(lf);
395                    buf = rt;
396
397                    old.frg = 0;
398                    sent_size += extend;
399                }
400
401                if buf.is_empty() {
402                    return Ok(sent_size);
403                }
404            }
405        }
406
407        let count = if buf.len() <= self.mss as usize {
408            1
409        } else {
410            (buf.len() + self.mss as usize - 1) / self.mss as usize
411        };
412
413        if count >= KCP_WND_RCV as usize {
414            debug!("send bufsize={} mss={} too large", buf.len(), self.mss);
415            return Err(Error::UserBufTooBig);
416        }
417        assert!(count > 0);
418
419        // let count = cmp::max(1, count);
420
421        for i in 0..count {
422            let size = cmp::min(self.mss as usize, buf.len());
423
424            let (lf, rt) = buf.split_at(size);
425
426            let mut new_segment = KcpSegment::new_with_data(lf.into());
427            buf = rt;
428
429            new_segment.frg = if self.stream {
430                0
431            } else {
432                (count - i - 1) as u8
433            };
434
435            self.snd_queue.push_back(new_segment);
436            sent_size += size;
437        }
438
439        Ok(sent_size)
440    }
441
442    fn update_ack(&mut self, rtt: u32) {
443        if self.rx_srtt == 0 {
444            self.rx_srtt = rtt;
445            self.rx_rttval = rtt / 2;
446        } else {
447            let delta = if rtt > self.rx_srtt {
448                rtt - self.rx_srtt
449            } else {
450                self.rx_srtt - rtt
451            };
452            self.rx_rttval = (3 * self.rx_rttval + delta) / 4;
453            self.rx_srtt = (7 * self.rx_srtt + rtt) / 8;
454            if self.rx_srtt < 1 {
455                self.rx_srtt = 1;
456            }
457        }
458        let rto = self.rx_srtt + cmp::max(self.interval, 4 * self.rx_rttval);
459        self.rx_rto = bound(self.rx_minrto, rto, KCP_RTO_MAX);
460    }
461
462    #[inline]
463    fn shrink_buf(&mut self) {
464        self.snd_una = match self.snd_buf.front() {
465            Some(seg) => seg.sn,
466            None => self.snd_nxt,
467        };
468    }
469
470    fn parse_ack(&mut self, sn: u32) {
471        if timediff(sn, self.snd_una) < 0 || timediff(sn, self.snd_nxt) >= 0 {
472            return;
473        }
474
475        for i in 0..self.snd_buf.len() {
476            match sn.cmp(&self.snd_buf[i].sn) {
477                Ordering::Equal => {
478                    self.snd_buf.remove(i);
479                }
480                Ordering::Less => break,
481                _ => (),
482            }
483        }
484    }
485
486    fn parse_una(&mut self, una: u32) {
487        while !self.snd_buf.is_empty() {
488            if timediff(una, self.snd_buf[0].sn) > 0 {
489                // self.snd_buf.remove(0);
490                self.snd_buf.pop_front();
491            } else {
492                break;
493            }
494        }
495    }
496
497    fn parse_fastack(&mut self, sn: u32) {
498        if timediff(sn, self.snd_una) < 0 || timediff(sn, self.snd_nxt) >= 0 {
499            return;
500        }
501
502        for seg in &mut self.snd_buf {
503            if timediff(sn, seg.sn) < 0 {
504                break;
505            } else if sn != seg.sn {
506                seg.fastack += 1;
507            }
508        }
509    }
510
511    #[inline]
512    fn ack_push(&mut self, sn: u32, ts: u32) {
513        self.acklist.push_back((sn, ts));
514    }
515
516    fn parse_data(&mut self, new_segment: KcpSegment) {
517        let sn = new_segment.sn;
518
519        if timediff(sn, self.rcv_nxt + self.rcv_wnd as u32) >= 0 || timediff(sn, self.rcv_nxt) < 0 {
520            return;
521        }
522
523        let mut repeat = false;
524        let mut new_index = self.rcv_buf.len();
525
526        for segment in self.rcv_buf.iter().rev() {
527            if segment.sn == sn {
528                repeat = true;
529                break;
530            } else if timediff(sn, segment.sn) > 0 {
531                break;
532            }
533            new_index -= 1;
534        }
535
536        if !repeat {
537            self.rcv_buf.insert(new_index, new_segment);
538        }
539
540        // move available data from rcv_buf -> rcv_queue
541        self.move_buf();
542    }
543
544    /// Get `conv` from the next `input` call
545    #[inline]
546    pub fn input_conv(&mut self) {
547        self.input_conv = true;
548    }
549
550    /// Check if Kcp is waiting for the next input
551    #[inline]
552    pub fn waiting_conv(&self) -> bool {
553        self.input_conv
554    }
555
556    /// Set `conv` value
557    #[inline]
558    pub fn set_conv(&mut self, conv: u32) {
559        self.conv = conv;
560    }
561
562    /// Get `conv`
563    #[inline]
564    pub fn conv(&self) -> u32 {
565        self.conv
566    }
567
568    /// Call this when you received a packet from raw connection
569    pub fn input(&mut self, buf: &[u8]) -> KcpResult<usize> {
570        let input_size = buf.len();
571
572        trace!("[RI] {} bytes", buf.len());
573
574        if buf.len() < KCP_OVERHEAD {
575            debug!(
576                "input bufsize={} too small, at least {}",
577                buf.len(),
578                KCP_OVERHEAD
579            );
580            return Err(Error::InvalidSegmentSize(buf.len()));
581        }
582
583        let mut flag = false;
584        let mut max_ack = 0;
585        let old_una = self.snd_una;
586
587        let mut buf = Cursor::new(buf);
588        while buf.remaining() >= KCP_OVERHEAD as usize {
589            let conv = buf.get_u32_le();
590            if conv != self.conv {
591                // This allows getting conv from this call, which allows us to allocate
592                // conv from the server side.
593                if self.input_conv {
594                    debug!("input conv={} updated, original conv={}", conv, self.conv);
595                    self.conv = conv;
596                    self.input_conv = false;
597                } else {
598                    debug!("input conv={} expected conv={} not match", conv, self.conv);
599                    return Err(Error::ConvInconsistent(self.conv, conv));
600                }
601            }
602
603            let cmd = buf.get_u8();
604            let frg = buf.get_u8();
605            let wnd = buf.get_u16_le();
606            let ts = buf.get_u32_le();
607            let sn = buf.get_u32_le();
608            let una = buf.get_u32_le();
609            let len = buf.get_u32_le() as usize;
610
611            if buf.remaining() < len as usize {
612                debug!(
613                    "input bufsize={} payload length={} remaining={} not match",
614                    input_size,
615                    len,
616                    buf.remaining()
617                );
618                return Err(Error::InvalidSegmentDataSize(len, buf.remaining()));
619            }
620
621            match cmd {
622                KCP_CMD_PUSH | KCP_CMD_ACK | KCP_CMD_WASK | KCP_CMD_WINS => {}
623                _ => {
624                    debug!("input cmd={} unrecognized", cmd);
625                    return Err(Error::UnsupportedCmd(cmd));
626                }
627            }
628
629            self.rmt_wnd = wnd;
630
631            self.parse_una(una);
632            self.shrink_buf();
633
634            let mut has_read_data = false;
635
636            match cmd {
637                KCP_CMD_ACK => {
638                    let rtt = timediff(self.current, ts);
639                    if rtt >= 0 {
640                        self.update_ack(rtt as u32);
641                    }
642                    self.parse_ack(sn);
643                    self.shrink_buf();
644
645                    if !flag {
646                        max_ack = sn;
647                        flag = true;
648                    } else if timediff(sn, max_ack) > 0 {
649                        max_ack = sn;
650                    }
651
652                    trace!(
653                        "input ack: sn={} rtt={} rto={}",
654                        sn,
655                        timediff(self.current, ts),
656                        self.rx_rto
657                    );
658                }
659                KCP_CMD_PUSH => {
660                    trace!("input psh: sn={} ts={}", sn, ts);
661
662                    if timediff(sn, self.rcv_nxt + self.rcv_wnd as u32) < 0 {
663                        self.ack_push(sn, ts);
664                        if timediff(sn, self.rcv_nxt) >= 0 {
665                            let mut sbuf = BytesMut::with_capacity(len as usize);
666                            unsafe {
667                                sbuf.set_len(len as usize);
668                            }
669                            buf.read_exact(&mut sbuf).unwrap();
670                            has_read_data = true;
671
672                            let mut segment = KcpSegment::new_with_data(sbuf);
673
674                            segment.conv = conv;
675                            segment.cmd = cmd;
676                            segment.frg = frg;
677                            segment.wnd = wnd;
678                            segment.ts = ts;
679                            segment.sn = sn;
680                            segment.una = una;
681
682                            self.parse_data(segment);
683                        }
684                    }
685                }
686                KCP_CMD_WASK => {
687                    trace!("input probe");
688                    self.probe |= KCP_ASK_TELL;
689                }
690                KCP_CMD_WINS => {
691                    // Do nothing
692                    trace!("input wins: {}", wnd);
693                }
694                _ => unreachable!(),
695            }
696
697            // Force skip unread data
698            if !has_read_data {
699                let next_pos = buf.position() + len as u64;
700                buf.set_position(next_pos);
701            }
702        }
703
704        if flag {
705            self.parse_fastack(max_ack);
706        }
707
708        if self.snd_una > old_una && self.cwnd < self.rmt_wnd {
709            let mss = self.mss;
710            if self.cwnd < self.ssthresh {
711                self.cwnd += 1;
712                self.incr += mss;
713            } else {
714                if self.incr < mss {
715                    self.incr = mss;
716                }
717                self.incr += (mss * mss) / self.incr + (mss / 16);
718                if (self.cwnd + 1) as u32 * mss <= self.incr {
719                    self.cwnd += 1;
720                }
721            }
722            if self.cwnd > self.rmt_wnd {
723                self.cwnd = self.rmt_wnd;
724                self.incr = self.rmt_wnd as u32 * mss;
725            }
726        }
727
728        Ok(buf.position() as usize)
729    }
730
731    fn wnd_unused(&self) -> u16 {
732        if self.rcv_queue.len() < self.rcv_wnd as usize {
733            self.rcv_wnd - self.rcv_queue.len() as u16
734        } else {
735            0
736        }
737    }
738
739    fn _flush_ack(&mut self, segment: &mut KcpSegment) -> KcpResult<()> {
740        // flush acknowledges
741        // while let Some((sn, ts)) = self.acklist.pop_front() {
742        for &(sn, ts) in &self.acklist {
743            if self.buf.len() + KCP_OVERHEAD > self.mtu as usize {
744                (self.output)(&self.buf)?;
745                self.buf.clear();
746            }
747            segment.sn = sn;
748            segment.ts = ts;
749            segment.encode(&mut self.buf);
750        }
751        self.acklist.clear();
752
753        Ok(())
754    }
755
756    fn probe_wnd_size(&mut self) {
757        // probe window size (if remote window size equals zero)
758        if self.rmt_wnd == 0 {
759            if self.probe_wait == 0 {
760                self.probe_wait = KCP_PROBE_INIT;
761                self.ts_probe = self.current + self.probe_wait;
762            } else {
763                if timediff(self.current, self.ts_probe) >= 0 && self.probe_wait < KCP_PROBE_INIT {
764                    self.probe_wait = KCP_PROBE_INIT;
765                }
766                self.probe_wait += self.probe_wait / 2;
767                if self.probe_wait > KCP_PROBE_LIMIT {
768                    self.probe_wait = KCP_PROBE_LIMIT;
769                }
770                self.ts_probe = self.current + self.probe_wait;
771                self.probe |= KCP_ASK_SEND;
772            }
773        } else {
774            self.ts_probe = 0;
775            self.probe_wait = 0;
776        }
777    }
778
779    fn _flush_probe_commands(&mut self, cmd: u8, segment: &mut KcpSegment) -> KcpResult<()> {
780        segment.cmd = cmd;
781        if self.buf.len() + KCP_OVERHEAD > self.mtu as usize {
782            (self.output)(&self.buf)?;
783            self.buf.clear();
784        }
785        segment.encode(&mut self.buf);
786        Ok(())
787    }
788
789    fn flush_probe_commands(&mut self, segment: &mut KcpSegment) -> KcpResult<()> {
790        // flush window probing commands
791        if (self.probe & KCP_ASK_SEND) != 0 {
792            self._flush_probe_commands(KCP_CMD_WASK, segment)?;
793        }
794
795        // flush window probing commands
796        if (self.probe & KCP_ASK_TELL) != 0 {
797            self._flush_probe_commands(KCP_CMD_WINS, segment)?;
798        }
799        self.probe = 0;
800        Ok(())
801    }
802
803    /// Flush pending ACKs
804    pub fn flush_ack(&mut self) -> KcpResult<()> {
805        if !self.updated {
806            debug!("flush updated() must be called at least once");
807            return Err(Error::NeedUpdate);
808        }
809
810        let mut segment = KcpSegment {
811            conv: self.conv,
812            cmd: KCP_CMD_ACK,
813            wnd: self.wnd_unused(),
814            una: self.rcv_nxt,
815            ..Default::default()
816        };
817
818        self._flush_ack(&mut segment)
819    }
820
821    /// Flush pending data in buffer.
822    pub fn flush(&mut self) -> KcpResult<()> {
823        if !self.updated {
824            debug!("flush updated() must be called at least once");
825            return Err(Error::NeedUpdate);
826        }
827
828        let mut segment = KcpSegment {
829            conv: self.conv,
830            cmd: KCP_CMD_ACK,
831            wnd: self.wnd_unused(),
832            una: self.rcv_nxt,
833            ..Default::default()
834        };
835
836        self._flush_ack(&mut segment)?;
837        self.probe_wnd_size();
838        self.flush_probe_commands(&mut segment)?;
839
840        // println!("SNDBUF size {}", self.snd_buf.len());
841
842        // calculate window size
843        let mut cwnd = cmp::min(self.snd_wnd, self.rmt_wnd);
844        if !self.nocwnd {
845            cwnd = cmp::min(self.cwnd, cwnd);
846        }
847
848        // move data from snd_queue to snd_buf
849        while timediff(self.snd_nxt, self.snd_una + cwnd as u32) < 0 {
850            match self.snd_queue.pop_front() {
851                Some(mut new_segment) => {
852                    new_segment.conv = self.conv;
853                    new_segment.cmd = KCP_CMD_PUSH;
854                    new_segment.wnd = segment.wnd;
855                    new_segment.ts = self.current;
856                    new_segment.sn = self.snd_nxt;
857                    self.snd_nxt += 1;
858                    new_segment.una = self.rcv_nxt;
859                    new_segment.resendts = self.current;
860                    new_segment.rto = self.rx_rto;
861                    new_segment.fastack = 0;
862                    new_segment.xmit = 0;
863                    self.snd_buf.push_back(new_segment);
864                }
865                None => break,
866            }
867        }
868
869        // calculate resent
870        let resent = if self.fastresend > 0 {
871            self.fastresend
872        } else {
873            u32::max_value()
874        };
875
876        let rtomin = if !self.nodelay { self.rx_rto >> 3 } else { 0 };
877
878        let mut lost = false;
879        let mut change = 0;
880
881        for snd_segment in &mut self.snd_buf {
882            let mut need_send = false;
883
884            if snd_segment.xmit == 0 {
885                need_send = true;
886                snd_segment.xmit += 1;
887                snd_segment.rto = self.rx_rto;
888                snd_segment.resendts = self.current + snd_segment.rto + rtomin;
889            } else if timediff(self.current, snd_segment.resendts) >= 0 {
890                need_send = true;
891                snd_segment.xmit += 1;
892                self.xmit += 1;
893                if !self.nodelay {
894                    snd_segment.rto += self.rx_rto;
895                } else {
896                    snd_segment.rto += self.rx_rto / 2;
897                }
898                snd_segment.resendts = self.current + snd_segment.rto;
899                lost = true;
900            } else if snd_segment.fastack >= resent {
901                need_send = true;
902                snd_segment.xmit += 1;
903                snd_segment.fastack = 0;
904                snd_segment.resendts = self.current + snd_segment.rto;
905                change += 1;
906            }
907
908            if need_send {
909                snd_segment.ts = self.current;
910                snd_segment.wnd = segment.wnd;
911                snd_segment.una = self.rcv_nxt;
912
913                let need = KCP_OVERHEAD + snd_segment.data.len();
914
915                if self.buf.len() + need > self.mtu as usize {
916                    (self.output)(&self.buf)?;
917                    self.buf.clear();
918                }
919
920                snd_segment.encode(&mut self.buf);
921
922                if snd_segment.xmit >= self.dead_link {
923                    self.state = -1;
924                }
925            }
926        }
927
928        // Flush all data in buffer
929        if !self.buf.is_empty() {
930            (self.output)(&self.buf)?;
931            self.buf.clear();
932        }
933
934        // update ssthresh
935        if change > 0 {
936            let inflight = self.snd_nxt - self.snd_una;
937            self.ssthresh = inflight as u16 / 2;
938            if self.ssthresh < KCP_THRESH_MIN {
939                self.ssthresh = KCP_THRESH_MIN;
940            }
941            self.cwnd = self.ssthresh + resent as u16;
942            self.incr = self.cwnd as u32 * self.mss;
943        }
944
945        if lost {
946            self.ssthresh = cwnd / 2;
947            if self.ssthresh < KCP_THRESH_MIN {
948                self.ssthresh = KCP_THRESH_MIN;
949            }
950            self.cwnd = 1;
951            self.incr = self.mss;
952        }
953
954        if self.cwnd < 1 {
955            self.cwnd = 1;
956            self.incr = self.mss;
957        }
958
959        Ok(())
960    }
961
962    /// Update state every 10ms ~ 100ms.
963    ///
964    /// Or you can ask `check` when to call this again.
965    pub fn update(&mut self, current: u32) -> KcpResult<()> {
966        self.current = current;
967
968        if !self.updated {
969            self.updated = true;
970            self.ts_flush = self.current;
971        }
972
973        let mut slap = timediff(self.current, self.ts_flush);
974
975        if slap >= 10000 || slap < -10000 {
976            self.ts_flush = self.current;
977            slap = 0;
978        }
979
980        if slap >= 0 {
981            self.ts_flush += self.interval;
982            if timediff(self.current, self.ts_flush) >= 0 {
983                self.ts_flush = self.current + self.interval;
984            }
985            self.flush()?;
986        }
987
988        Ok(())
989    }
990
991    /// Determine when you should call `update`.
992    /// Return when you should invoke `update` in millisec, if there is no `input`/`send` calling.
993    /// You can call `update` in that time without calling it repeatly.
994    pub fn check(&self, current: u32) -> u32 {
995        if !self.updated {
996            return 0;
997        }
998
999        let mut ts_flush = self.ts_flush;
1000        let mut tm_packet = u32::max_value();
1001
1002        if timediff(current, ts_flush) >= 10000 || timediff(current, ts_flush) < -10000 {
1003            ts_flush = current;
1004        }
1005
1006        if timediff(current, ts_flush) >= 0 {
1007            // return self.interval;
1008            return 0;
1009        }
1010
1011        let tm_flush = timediff(ts_flush, current) as u32;
1012        for seg in &self.snd_buf {
1013            let diff = timediff(seg.resendts, current);
1014            if diff <= 0 {
1015                // return self.interval;
1016                return 0;
1017            }
1018            if (diff as u32) < tm_packet {
1019                tm_packet = diff as u32;
1020            }
1021        }
1022
1023        cmp::min(cmp::min(tm_packet, tm_flush), self.interval)
1024    }
1025
1026    /// Change MTU size, default is 1400
1027    ///
1028    /// MTU = Maximum Transmission Unit
1029    pub fn set_mtu(&mut self, mtu: usize) -> KcpResult<()> {
1030        if mtu < 50 || mtu < KCP_OVERHEAD {
1031            debug!("set_mtu mtu={} invalid", mtu);
1032            return Err(Error::InvalidMtu(mtu));
1033        }
1034
1035        self.mtu = mtu;
1036        self.mss = (self.mtu - KCP_OVERHEAD) as u32;
1037
1038        let additional = ((mtu + KCP_OVERHEAD) * 3) as isize - self.buf.capacity() as isize;
1039        if additional > 0 {
1040            self.buf.reserve(additional as usize);
1041        }
1042
1043        Ok(())
1044    }
1045
1046    /// Get MTU
1047    pub fn mtu(&self) -> usize {
1048        self.mtu
1049    }
1050
1051    /// Set check interval
1052    pub fn set_interval(&mut self, mut interval: u32) {
1053        if interval > 5000 {
1054            interval = 5000;
1055        } else if interval < 10 {
1056            interval = 10;
1057        }
1058        self.interval = interval;
1059    }
1060
1061    /// Set nodelay
1062    ///
1063    /// fastest config: nodelay(true, 20, 2, true)
1064    ///
1065    /// `nodelay`: default is disable (false)
1066    /// `interval`: internal update timer interval in millisec, default is 100ms
1067    /// `resend`: 0:disable fast resend(default), 1:enable fast resend
1068    /// `nc`: `false`: normal congestion control(default), `true`: disable congestion control
1069    pub fn set_nodelay(&mut self, nodelay: bool, mut interval: i32, resend: i32, nc: bool) {
1070        if nodelay {
1071            self.nodelay = true;
1072            self.rx_minrto = KCP_RTO_NDL;
1073        } else {
1074            self.nodelay = false;
1075            self.rx_minrto = KCP_RTO_MIN;
1076        }
1077
1078        if interval >= 0 {
1079            if interval > 5000 {
1080                interval = 5000;
1081            } else if interval < 10 {
1082                interval = 10;
1083            }
1084
1085            self.interval = interval as u32;
1086        }
1087
1088        if resend >= 0 {
1089            self.fastresend = resend as u32;
1090        }
1091
1092        self.nocwnd = nc;
1093    }
1094
1095    /// Set `wndsize`
1096    /// set maximum window size: `sndwnd=32`, `rcvwnd=32` by default
1097    pub fn set_wndsize(&mut self, sndwnd: u16, rcvwnd: u16) {
1098        if sndwnd > 0 {
1099            self.snd_wnd = sndwnd as u16;
1100        }
1101
1102        if rcvwnd > 0 {
1103            self.rcv_wnd = cmp::max(rcvwnd, KCP_WND_RCV) as u16;
1104        }
1105    }
1106
1107    /// `snd_wnd` Send window
1108    pub fn snd_wnd(&self) -> u16 {
1109        self.snd_wnd
1110    }
1111
1112    /// `rcv_wnd` Receive window
1113    pub fn rcv_wnd(&self) -> u16 {
1114        self.rcv_wnd
1115    }
1116
1117    /// Get `waitsnd`, how many packet is waiting to be sent
1118    pub fn wait_snd(&self) -> usize {
1119        self.snd_buf.len() + self.snd_queue.len()
1120    }
1121
1122    /// Set `rx_minrto`
1123    pub fn set_rx_minrto(&mut self, rto: u32) {
1124        self.rx_minrto = rto;
1125    }
1126
1127    /// Set `fastresend`
1128    pub fn set_fast_resend(&mut self, fr: u32) {
1129        self.fastresend = fr;
1130    }
1131
1132    /// KCP header size
1133    pub fn header_len() -> usize {
1134        KCP_OVERHEAD as usize
1135    }
1136
1137    /// Enabled stream or not
1138    pub fn is_stream(&self) -> bool {
1139        self.stream
1140    }
1141
1142    /// Maximum Segment Size
1143    pub fn mss(&self) -> u32 {
1144        self.mss
1145    }
1146
1147    /// Set maximum resend times
1148    pub fn set_maximum_resend_times(&mut self, dead_link: u32) {
1149        self.dead_link = dead_link;
1150    }
1151
1152    /// Check if KCP connection is dead (resend times excceeded)
1153    pub fn is_dead_link(&self) -> bool {
1154        self.state != 0
1155    }
1156}