kcp/
stream.rs

1pub use crate::config::*;
2use crate::halfclose::*;
3use crate::protocol::Kcp;
4
5use ::bytes::{BufMut, Bytes, BytesMut};
6use ::futures::{future::poll_fn, ready, FutureExt, Sink, SinkExt, Stream, StreamExt};
7use ::log::{error, trace, warn};
8use ::std::{
9    fmt::Display,
10    io,
11    marker::PhantomData,
12    ops::Deref,
13    pin::Pin,
14    sync::Arc,
15    task::{Context, Poll},
16    time::Duration,
17};
18use ::tokio::{
19    io::{AsyncRead, AsyncWrite, ReadBuf},
20    select,
21    sync::mpsc::{channel, error::TryRecvError, Receiver, Sender},
22    task::JoinHandle,
23    time::sleep,
24};
25use ::tokio_util::sync::{CancellationToken, PollSender};
26
27macro_rules! debug {
28    ($($x:expr),* $(,)?) => {
29        //log::debug!($($x),*)
30    };
31}
32
33pub struct KcpStream {
34    config: Arc<KcpConfig>,
35    conv: u32,
36    input_sink: PollSender<Bytes>,
37    output_rx: Receiver<Output>,
38    token: CancellationToken,
39    task: Option<JoinHandle<()>>,
40    // for AsyncRead
41    read_buf: Option<Bytes>,
42}
43
44impl KcpStream {
45    /// Read the session id from the SYN handshake packet.
46    pub fn read_session_id<'a>(packet: &'a [u8], session_key: &[u8]) -> Option<&'a [u8]> {
47        Kcp::read_payload_data(packet).and_then(|x| x.strip_prefix(session_key))
48    }
49}
50
51impl KcpStream {
52    pub async fn connect<T, Si, D>(
53        config: Arc<KcpConfig>,
54        transport: T,
55        disconnect: D,
56        token: Option<CancellationToken>,
57    ) -> io::Result<Self>
58    where
59        T: Sink<Si> + Stream<Item = BytesMut> + Send + Unpin + 'static,
60        Si: From<BytesMut> + Send + Unpin + 'static,
61        <T as Sink<Si>>::Error: Display,
62        D: Sink<u32> + Send + Unpin + 'static,
63    {
64        Self::new(config.clone(), Kcp::SYN_CONV, transport, disconnect, token)
65            .wait_connection()
66            .await
67    }
68
69    pub async fn accept<T, Si, D>(
70        config: Arc<KcpConfig>,
71        conv: u32,
72        transport: T,
73        disconnect: D,
74        token: Option<CancellationToken>,
75    ) -> io::Result<Self>
76    where
77        T: Sink<Si> + Stream<Item = BytesMut> + Send + Unpin + 'static,
78        Si: From<BytesMut> + Send + Unpin + 'static,
79        <T as Sink<Si>>::Error: Display,
80        D: Sink<u32> + Send + Unpin + 'static,
81    {
82        if !Kcp::is_valid_conv(conv) {
83            error!("Invalid conv 0x{:08X}", conv);
84            return Err(io::ErrorKind::InvalidInput.into());
85        }
86        Self::new(config.clone(), conv, transport, disconnect, token)
87            .wait_connection()
88            .await
89    }
90
91    #[inline]
92    pub fn config(&self) -> Arc<KcpConfig> {
93        self.config.clone()
94    }
95
96    #[inline]
97    pub fn conv(&self) -> u32 {
98        self.conv
99    }
100
101    /// Shutdown the input channel without awaiting.
102    pub fn shutdown_immediately(&mut self) {
103        self.input_sink.abort_send();
104        self.input_sink.close();
105    }
106}
107
108impl KcpStream {
109    fn new<T, Si, D>(
110        config: Arc<KcpConfig>,
111        conv: u32,
112        transport: T,
113        disconnect: D,
114        token: Option<CancellationToken>,
115    ) -> Self
116    where
117        T: Sink<Si> + Stream<Item = BytesMut> + Send + Unpin + 'static,
118        Si: From<BytesMut> + Send + Unpin + 'static,
119        <T as Sink<Si>>::Error: Display,
120        D: Sink<u32> + Send + Unpin + 'static,
121    {
122        let token = token.unwrap_or_default();
123        let (input_tx, input_rx) = channel(config.snd_wnd.max(8) as usize);
124        let (output_tx, output_rx) = channel(config.rcv_wnd.max(16) as usize);
125
126        Self {
127            config: config.clone(),
128            conv: 0,
129            input_sink: PollSender::new(input_tx),
130            output_rx,
131            token: token.clone(),
132            task: Some(tokio::spawn(
133                Task {
134                    kcp: Kcp::new(conv),
135                    config,
136                    input_rx,
137                    token,
138                    rx_buf: BytesMut::new(),
139                    hs: Handshake::Syn,
140                    flags: 0,
141                    hs_end_time: 0,
142                    last_io_time: 0,
143                    session_id: Default::default(),
144                    _phantom: Default::default(),
145                }
146                .run(output_tx, transport, disconnect),
147            )),
148            read_buf: None,
149        }
150    }
151
152    async fn wait_connection(mut self) -> io::Result<Self> {
153        let rst = match self.output_rx.recv().await {
154            Some(Output::Connected { conv }) => {
155                trace!("Connect conv {}", conv);
156                self.conv = conv;
157                return Ok(self);
158            }
159            _ => Err(io::ErrorKind::TimedOut.into()),
160        };
161        self.close().await.ok();
162        rst
163    }
164
165    fn try_close(&mut self) {
166        self.token.cancel();
167        self.output_rx.close();
168    }
169}
170
171impl Drop for KcpStream {
172    fn drop(&mut self) {
173        self.try_close();
174    }
175}
176
177impl Stream for KcpStream {
178    type Item = io::Result<Bytes>;
179
180    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181        if self.read_buf.is_some() {
182            return Poll::Ready(Some(Ok(self.read_buf.take().unwrap())));
183        }
184        while let Some(x) = ready!(self.output_rx.poll_recv(cx)) {
185            if let Output::Frame(frame) = x {
186                return Poll::Ready(Some(Ok(frame)));
187            }
188        }
189        Poll::Ready(None)
190    }
191}
192
193impl Sink<Bytes> for KcpStream {
194    type Error = io::Error;
195
196    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
197        self.get_mut()
198            .input_sink
199            .poll_ready_unpin(cx)
200            .map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
201    }
202
203    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
204        Poll::Ready(Ok(()))
205    }
206
207    fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
208        self.get_mut()
209            .input_sink
210            .start_send_unpin(item)
211            .map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
212    }
213
214    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
215        let this = self.get_mut();
216        let _ = ready!(this.input_sink.poll_close_unpin(cx));
217        this.try_close();
218        if let Some(task) = this.task.as_mut() {
219            let _ = ready!(task.poll_unpin(cx));
220            this.task.take();
221        }
222        Poll::Ready(Ok(()))
223    }
224}
225
226impl AsyncRead for KcpStream {
227    fn poll_read(
228        self: Pin<&mut Self>,
229        cx: &mut Context<'_>,
230        buf: &mut ReadBuf<'_>,
231    ) -> Poll<io::Result<()>> {
232        if buf.remaining() == 0 {
233            return Poll::Ready(Ok(()));
234        }
235
236        let this = self.get_mut();
237
238        while this.read_buf.is_none() {
239            match this.poll_next_unpin(cx) {
240                Poll::Ready(Some(Ok(chunk))) => {
241                    if !chunk.is_empty() {
242                        this.read_buf = Some(chunk);
243                    }
244                }
245                Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
246                Poll::Ready(None) => return Poll::Ready(Ok(())),
247                Poll::Pending => return Poll::Pending,
248            }
249        }
250
251        if let Some(ref mut chunk) = this.read_buf {
252            let len = chunk.len().min(buf.remaining());
253            buf.put_slice(&chunk[..len]);
254            if len < chunk.len() {
255                let _ = chunk.split_to(len);
256            } else {
257                this.read_buf = None;
258            }
259        }
260
261        Poll::Ready(Ok(()))
262    }
263}
264
265impl AsyncWrite for KcpStream {
266    fn poll_write(
267        self: Pin<&mut Self>,
268        cx: &mut Context<'_>,
269        buf: &[u8],
270    ) -> Poll<Result<usize, io::Error>> {
271        if buf.is_empty() {
272            return Poll::Ready(Ok(0));
273        }
274
275        let this = self.get_mut();
276        let mut len = 0;
277
278        for chunk in buf.chunks(Kcp::max_frame_size(this.config.mtu) as usize) {
279            match this.poll_ready_unpin(cx) {
280                Poll::Ready(Ok(_)) => {
281                    if let Err(e) = this.start_send_unpin(Bytes::copy_from_slice(chunk)) {
282                        if len > 0 {
283                            break;
284                        }
285                        return Poll::Ready(Err(e));
286                    }
287                    len += chunk.len();
288                }
289                Poll::Ready(Err(e)) => {
290                    if len > 0 {
291                        break;
292                    }
293                    return Poll::Ready(Err(e));
294                }
295                Poll::Pending => {
296                    if len > 0 {
297                        break;
298                    }
299                    return Poll::Pending;
300                }
301            }
302        }
303
304        Poll::Ready(Ok(len))
305    }
306
307    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
308        Poll::Ready(Ok(()))
309    }
310
311    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
312        self.poll_close(cx)
313    }
314}
315
316////////////////////////////////////////////////////////////////////////////////
317
318#[derive(Debug)]
319enum Output {
320    Connected { conv: u32 },
321    Frame(Bytes),
322}
323
324#[derive(Clone, Copy, PartialOrd, PartialEq, Eq, Debug)]
325enum Handshake {
326    Syn,
327    Connected,
328    FinPending,
329    FinSent,
330    FinWaitPeer,
331    Disconnected,
332}
333
334struct Task<T, Si> {
335    kcp: Kcp,
336    config: Arc<KcpConfig>,
337    input_rx: Receiver<Bytes>,
338    token: CancellationToken,
339    rx_buf: BytesMut,
340
341    hs: Handshake,
342    flags: u32,
343    hs_end_time: u32,
344    last_io_time: u32,
345    session_id: Vec<u8>,
346    _phantom: PhantomData<(T, Si)>,
347}
348
349impl<T, Si> Task<T, Si> {
350    const CLIENT: u32 = 0x01;
351    const FLUSH: u32 = 0x02;
352    const FIN_RECVED: u32 = 0x04;
353    const INPUT_CLOSED: u32 = 0x08;
354
355    const CMD_MASK: u8 = 0x57;
356    const KCP_SYN: u8 = 0x80;
357    const KCP_FIN: u8 = 0x20;
358    const KCP_RESET: u8 = 0x08;
359
360    #[inline]
361    fn is_client(&self) -> bool {
362        self.flags & Self::CLIENT != 0
363    }
364}
365
366impl<T, Si> std::fmt::Display for Task<T, Si> {
367    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368        write!(
369            f,
370            "KcpStream({}-{} {:?})",
371            self.kcp.conv(),
372            self.is_client() as i32,
373            self.hs
374        )
375    }
376}
377
378impl<T, Si> Task<T, Si>
379where
380    T: Sink<Si> + Stream<Item = BytesMut> + Send + Unpin + 'static,
381    Si: From<BytesMut> + Send + Unpin + 'static,
382    <T as Sink<Si>>::Error: Display,
383{
384    async fn run<D>(mut self, output_tx: Sender<Output>, mut transport: T, mut disconnect: D)
385    where
386        D: Sink<u32> + Send + Unpin,
387    {
388        self.kcp.initialize();
389        self.kcp_apply_config();
390
391        // Prepare for SYN handshake.
392        self.hs = Handshake::Syn;
393        self.hs_end_time =
394            self.kcp.get_system_time() + self.config.connect_timeout.as_secs() as u32 * 1000;
395        self.kcp.set_nodelay(true, 100, 0, false);
396        self.last_io_time = self.kcp.current();
397        if self.kcp.conv() == Kcp::SYN_CONV {
398            self.flags |= Self::CLIENT;
399            // Create random session ID for handshake.
400            self.session_id = self.config.random_session_id();
401            // Send SYN packet.
402            self.kcp.set_conv(Kcp::SYN_CONV);
403            self.handshake_send();
404        }
405
406        let mut tx_frame: Option<Bytes> = None;
407        loop {
408            if self.kcp.has_ouput() {
409                select! {
410                    biased;
411                    x = Self::kcp_output(&mut self.kcp, &mut transport, self.hs) => {
412                        if x.is_err() { break; }
413                    }
414                    _ = self.token.cancelled() => break,
415                }
416            }
417
418            if self.hs == Handshake::Disconnected {
419                break;
420            }
421
422            if self.kcp.is_dead_link()
423                || self.kcp.duration_since(self.last_io_time)
424                    >= (self.config.session_expire.as_secs() * 1000) as u32
425            {
426                // TODO: dead link
427                break;
428            }
429
430            let current = self.kcp.get_system_time();
431            let mut interval = self.kcp.check(current).wrapping_sub(current).min(60000);
432            if self.hs != Handshake::Connected {
433                let timeout = self.hs_end_time.wrapping_sub(current);
434                if timeout as i32 > 0 {
435                    interval = interval.min(timeout);
436                } else {
437                    // SYN / FIN handshake is timed-out.
438                    self.set_handshake(Handshake::Disconnected);
439                    continue;
440                }
441            }
442            if interval == 0 {
443                self.kcp.update(current);
444                continue;
445            }
446
447            select! {
448                x = self.input_rx.recv(), if !self.has(Self::INPUT_CLOSED)
449                        && !self.kcp.is_send_queue_full() => {
450                    let mut closed = false;
451                    match x {
452                        Some(frame) => {
453                            self.process_input(frame);
454                            // Try to process more.
455                            while !self.kcp.is_send_queue_full() {
456                                match self.input_rx.try_recv() {
457                                    Ok(frame) => self.process_input(frame),
458                                    Err(TryRecvError::Empty) => break,
459                                    Err(TryRecvError::Disconnected) => {
460                                        closed = true;
461                                        break;
462                                    }
463                                }
464                            }
465                        }
466                        _ => closed = true,
467                    }
468                    if closed {
469                        debug!("{} input channel has been closed, start FIN handshake", &self);
470                        self.flags |= Self::INPUT_CLOSED;
471                        self.fin_transit_state();
472                    }
473                    // Try to flush.
474                    self.kcp_flush();
475                }
476
477                x = output_tx.reserve(), if tx_frame.is_some() => {
478                    match x {
479                        Ok(permit) => {
480                            if let Some(frame) = tx_frame.take() {
481                                permit.send(Output::Frame(frame));
482                            }
483                            while let Some(frame) = self.kcp_recv() {
484                                match output_tx.try_reserve() {
485                                    Ok(permit) => permit.send(Output::Frame(frame)),
486                                    _ => {
487                                        // Save the data frame.
488                                        tx_frame = Some(frame);
489                                        break;
490                                    }
491                                }
492                            }
493                        }
494                        _ => break,
495                    }
496                }
497
498                x = transport.next() => {
499                    match x {
500                        Some(packet) => {
501                            self.kcp_input(packet);
502                            // Try to receive more.
503                            poll_fn(|cx| {
504                                for _ in 1..self.config.rcv_wnd {
505                                    match transport.poll_next_unpin(cx) {
506                                        Poll::Ready(Some(packet)) => self.kcp_input(packet),
507                                        _ => break,
508                                    }
509                                }
510                                Poll::Ready(())
511                            }).await;
512
513                            if self.hs == Handshake::Syn {
514                                if self.syn_handshake_recv(&output_tx).is_err() {
515                                    self.set_handshake(Handshake::Disconnected);
516                                }
517                            } else if self.is_fin_handshake() {
518                                // FIN handshake.
519                                self.fin_transit_state();
520                            }
521
522                            // Try to fetch a frame.
523                            if tx_frame.is_none() {
524                                tx_frame = self.kcp_recv();
525                            }
526                            // Try to flush.
527                            self.kcp_flush();
528                        }
529                        _ => break,
530                    }
531                }
532
533                _ = sleep(Duration::from_millis(interval as u64)) => (),
534                _ = self.token.cancelled() => break,
535            }
536        }
537
538        debug!("{} break loop", &self);
539
540        tx_frame.take();
541        drop(output_tx);
542
543        // Close and drain the input queue.
544        self.input_rx.close();
545        while self.input_rx.recv().await.is_some() {}
546
547        let conv = self.kcp.conv();
548        if Kcp::is_valid_conv(conv) {
549            let mut buf = BytesMut::new();
550            self.kcp.write_ack_head(&mut buf, Self::KCP_RESET, 0);
551            let half_close_timeout = self.config.half_close_timeout;
552            // Free the KCP context.
553            drop(self);
554            HalfClosePool::create_task(transport, buf, half_close_timeout).await;
555        }
556
557        // Report disconnect.
558        disconnect.send(conv).await.ok();
559    }
560
561    #[inline]
562    fn has(&self, state: u32) -> bool {
563        self.flags & state != 0
564    }
565
566    fn handshake_send(&mut self) {
567        self.flags |= Self::FLUSH;
568        let mut syn = Vec::<u8>::new();
569        syn.put_slice(&self.config.session_key);
570        syn.put_slice(&self.session_id);
571        self.kcp.send(syn.as_slice()).unwrap();
572        self.kcp_flush();
573    }
574
575    fn syn_handshake_recv(&mut self, output_tx: &Sender<Output>) -> Result<(), ()> {
576        // Check SYN frame.
577        let syn = match self.kcp_recv() {
578            Some(x) => x,
579            _ => return Ok(()),
580        };
581        // Check the session key and get the session ID.
582        if let Some(session_id) = syn
583            .strip_prefix(self.config.session_key.deref())
584            .and_then(|x| {
585                if x.len() == self.config.session_id_len {
586                    Some(x)
587                } else {
588                    None
589                }
590            })
591        {
592            // Key must be consistent.
593            if self.is_client() {
594                // Verify the session ID for the client endpoint.
595                if self.session_id != session_id {
596                    return Err(());
597                }
598                self.kcp_apply_nodelay();
599            } else {
600                self.session_id = session_id.to_vec();
601                self.kcp_apply_nodelay();
602                self.handshake_send();
603            }
604
605            // Connection has been established.
606            self.set_handshake(Handshake::Connected);
607            return output_tx
608                .try_send(Output::Connected {
609                    conv: self.kcp.conv(),
610                })
611                .map_err(|_| ());
612        }
613        Err(())
614    }
615
616    fn syn_handshake_input(&mut self, packet: &[u8]) -> io::Result<()> {
617        // Switch kcp's conv to try to accept the packet.
618        if let Some(conv) = Kcp::read_conv(packet) {
619            if self.is_client() {
620                if self.hs == Handshake::Syn && conv != Kcp::SYN_CONV {
621                    // For the client endpoint.
622                    // Try to accept conv from the server.
623                    self.kcp.set_conv(conv);
624                    if self.kcp.input(packet).is_err() || self.kcp.get_waitsnd() > 0 {
625                        // Restore conv if failed.
626                        self.kcp.set_conv(Kcp::SYN_CONV);
627                    }
628                }
629                return Ok(());
630            } else if conv == Kcp::SYN_CONV {
631                // For the server endpoint.
632                let mine = self.kcp.conv();
633                // Switch conv temporarily.
634                self.kcp.set_conv(conv);
635                self.kcp.input(packet).ok();
636                // Restore conv.
637                self.kcp.set_conv(mine);
638                return Ok(());
639            }
640        }
641        Err(io::ErrorKind::InvalidInput.into())
642    }
643
644    #[inline]
645    fn is_fin_handshake(&self) -> bool {
646        (Handshake::FinPending..Handshake::FinWaitPeer).contains(&self.hs)
647    }
648
649    fn set_handshake(&mut self, hs: Handshake) {
650        if self.hs != hs {
651            debug!("{} -> {:?}", self, hs);
652            self.hs = hs;
653        }
654    }
655
656    fn fin_transit_state(&mut self) {
657        loop {
658            let state = match self.hs {
659                Handshake::Syn => Handshake::Disconnected,
660                Handshake::Connected => {
661                    self.hs_end_time = self.kcp.get_system_time()
662                        + self.config.shutdown_timeout.as_secs() as u32 * 1000;
663                    Handshake::FinPending
664                }
665                Handshake::FinPending => {
666                    debug!(
667                        "{} input closed: {}, waitsnd: {}",
668                        &self,
669                        self.has(Self::INPUT_CLOSED),
670                        self.kcp.get_waitsnd()
671                    );
672                    if !self.has(Self::INPUT_CLOSED) || self.kcp.get_waitsnd() > 0 {
673                        break;
674                    }
675                    self.handshake_send();
676                    Handshake::FinSent
677                }
678                Handshake::FinSent => {
679                    debug!("{} waitsnd: {}", self, self.kcp.get_waitsnd());
680                    if self.kcp.get_waitsnd() > 0 {
681                        break;
682                    }
683                    Handshake::FinWaitPeer
684                }
685                Handshake::FinWaitPeer => {
686                    debug!(
687                        "{} waitsnd: {} + {}",
688                        self,
689                        self.kcp.nrcv_que(),
690                        self.kcp.nrcv_buf(),
691                    );
692                    // KCP: ensure all frames have been received.
693                    if !self.has(Self::FIN_RECVED) || self.kcp.nrcv_que() + self.kcp.nrcv_buf() > 0
694                    {
695                        break;
696                    }
697                    Handshake::Disconnected
698                }
699                Handshake::Disconnected => break,
700            };
701            self.set_handshake(state);
702        }
703    }
704
705    fn process_input(&mut self, frame: Bytes) {
706        match self.kcp.send(frame.deref()) {
707            Ok(_) => {
708                // KCP: flush if it's no delay or the number of not-sent buffers is greater than 1.
709                if self.config.nodelay.nodelay || self.kcp.nsnd_que() > 1 {
710                    self.flags |= Self::FLUSH;
711                }
712            }
713            _ => error!(
714                "Too big frame size: {} > {}",
715                frame.len(),
716                Kcp::max_frame_size(self.config.mtu)
717            ),
718        }
719    }
720
721    fn kcp_apply_config(&mut self) {
722        self.kcp.set_stream(self.config.stream);
723        self.kcp.set_mtu(self.config.mtu).unwrap();
724        self.kcp
725            .set_wndsize(self.config.snd_wnd, self.config.rcv_wnd);
726        self.kcp_apply_nodelay();
727
728        // Resize buffer.
729        let size = self.config.mtu as usize * 3;
730        if self.rx_buf.len() < size {
731            self.rx_buf.resize(size, 0);
732        }
733    }
734
735    fn kcp_apply_nodelay(&mut self) {
736        self.kcp.set_nodelay(
737            self.config.nodelay.nodelay,
738            self.config.nodelay.interval,
739            self.config.nodelay.resend,
740            self.config.nodelay.nc,
741        );
742    }
743
744    fn kcp_recv(&mut self) -> Option<Bytes> {
745        // KCP: FIN frame is the last one in the stream.
746        if self.has(Self::FIN_RECVED) && self.kcp.nrcv_que() == 1 && self.kcp.nrcv_buf() == 0 {
747            // Check the session key and ID of the FIN frame.
748            if let Some(fin) = self.kcp.recv_bytes() {
749                if Some(&self.session_id[..]) == fin.strip_prefix(self.config.session_key.deref()) {
750                    debug!("{} receive FIN frame", self);
751                }
752            }
753            None
754        } else {
755            self.kcp.recv_bytes()
756        }
757    }
758
759    fn kcp_flush(&mut self) {
760        if self.has(Self::FLUSH) {
761            self.flags ^= Self::FLUSH;
762            self.kcp.update(self.kcp.get_system_time());
763            self.kcp.flush();
764            self.last_io_time = self.kcp.current();
765        }
766    }
767
768    fn kcp_input(&mut self, mut packet: BytesMut) {
769        match self.kcp.input(&packet) {
770            Ok(_) => self.flags |= Self::FLUSH,
771            Err(e) => match e.kind() {
772                io::ErrorKind::NotFound => {
773                    // SYN handshake.
774                    if self.syn_handshake_input(&packet).is_ok() {
775                        return;
776                    }
777                    trace!(
778                        "{} conv does not match: {}",
779                        self,
780                        Kcp::read_conv(&self.rx_buf).unwrap_or(0),
781                    );
782                }
783                io::ErrorKind::InvalidData => {
784                    let cmd = Kcp::read_cmd(&packet);
785                    if cmd & Self::KCP_RESET != 0 {
786                        debug!("{} receive RESET", self);
787                        self.set_handshake(Handshake::Disconnected);
788                        return;
789                    }
790                    if cmd & Self::KCP_FIN != 0 {
791                        if !self.has(Self::FIN_RECVED) {
792                            self.flags |= Self::FIN_RECVED;
793                            // Do not get any more input.
794                            self.input_rx.close();
795                        }
796                        self.fin_transit_state();
797                        Kcp::write_cmd(&mut packet, cmd ^ Self::KCP_FIN);
798                        if self.kcp.input(&packet).is_ok() {
799                            return;
800                        }
801                    }
802                    trace!("packet parse error");
803                }
804                _ => unreachable!(),
805            },
806        }
807    }
808
809    async fn kcp_output(kcp: &mut Kcp, sink: &mut T, hs: Handshake) -> Result<(), ()> {
810        while let Some(mut packet) = kcp.pop_output() {
811            if hs == Handshake::FinSent {
812                // Set KCP_FIN flag for FIN handshake.
813                if Kcp::read_payload_data(&packet).is_some() {
814                    let cmd = Kcp::read_cmd(&packet) | Self::KCP_FIN;
815                    Kcp::write_cmd(&mut packet, cmd);
816                }
817            }
818
819            if let Err(e) = sink.feed(packet.into()).await {
820                // Clear all output buffers on errors.
821                while kcp.pop_output().is_some() {}
822                warn!("send to sink: {}", e);
823                break;
824            }
825        }
826        sink.flush().await.map_err(|_| ())?;
827        Ok(())
828    }
829}