1pub use crate::config::*;
2use crate::halfclose::*;
3use crate::protocol::Kcp;
4
5use ::bytes::{BufMut, Bytes, BytesMut};
6use ::futures::{future::poll_fn, ready, FutureExt, Sink, SinkExt, Stream, StreamExt};
7use ::log::{error, trace, warn};
8use ::std::{
9 fmt::Display,
10 io,
11 marker::PhantomData,
12 ops::Deref,
13 pin::Pin,
14 sync::Arc,
15 task::{Context, Poll},
16 time::Duration,
17};
18use ::tokio::{
19 io::{AsyncRead, AsyncWrite, ReadBuf},
20 select,
21 sync::mpsc::{channel, error::TryRecvError, Receiver, Sender},
22 task::JoinHandle,
23 time::sleep,
24};
25use ::tokio_util::sync::{CancellationToken, PollSender};
26
27macro_rules! debug {
28 ($($x:expr),* $(,)?) => {
29 };
31}
32
33pub struct KcpStream {
34 config: Arc<KcpConfig>,
35 conv: u32,
36 input_sink: PollSender<Bytes>,
37 output_rx: Receiver<Output>,
38 token: CancellationToken,
39 task: Option<JoinHandle<()>>,
40 read_buf: Option<Bytes>,
42}
43
44impl KcpStream {
45 pub fn read_session_id<'a>(packet: &'a [u8], session_key: &[u8]) -> Option<&'a [u8]> {
47 Kcp::read_payload_data(packet).and_then(|x| x.strip_prefix(session_key))
48 }
49}
50
51impl KcpStream {
52 pub async fn connect<T, Si, D>(
53 config: Arc<KcpConfig>,
54 transport: T,
55 disconnect: D,
56 token: Option<CancellationToken>,
57 ) -> io::Result<Self>
58 where
59 T: Sink<Si> + Stream<Item = BytesMut> + Send + Unpin + 'static,
60 Si: From<BytesMut> + Send + Unpin + 'static,
61 <T as Sink<Si>>::Error: Display,
62 D: Sink<u32> + Send + Unpin + 'static,
63 {
64 Self::new(config.clone(), Kcp::SYN_CONV, transport, disconnect, token)
65 .wait_connection()
66 .await
67 }
68
69 pub async fn accept<T, Si, D>(
70 config: Arc<KcpConfig>,
71 conv: u32,
72 transport: T,
73 disconnect: D,
74 token: Option<CancellationToken>,
75 ) -> io::Result<Self>
76 where
77 T: Sink<Si> + Stream<Item = BytesMut> + Send + Unpin + 'static,
78 Si: From<BytesMut> + Send + Unpin + 'static,
79 <T as Sink<Si>>::Error: Display,
80 D: Sink<u32> + Send + Unpin + 'static,
81 {
82 if !Kcp::is_valid_conv(conv) {
83 error!("Invalid conv 0x{:08X}", conv);
84 return Err(io::ErrorKind::InvalidInput.into());
85 }
86 Self::new(config.clone(), conv, transport, disconnect, token)
87 .wait_connection()
88 .await
89 }
90
91 #[inline]
92 pub fn config(&self) -> Arc<KcpConfig> {
93 self.config.clone()
94 }
95
96 #[inline]
97 pub fn conv(&self) -> u32 {
98 self.conv
99 }
100
101 pub fn shutdown_immediately(&mut self) {
103 self.input_sink.abort_send();
104 self.input_sink.close();
105 }
106}
107
108impl KcpStream {
109 fn new<T, Si, D>(
110 config: Arc<KcpConfig>,
111 conv: u32,
112 transport: T,
113 disconnect: D,
114 token: Option<CancellationToken>,
115 ) -> Self
116 where
117 T: Sink<Si> + Stream<Item = BytesMut> + Send + Unpin + 'static,
118 Si: From<BytesMut> + Send + Unpin + 'static,
119 <T as Sink<Si>>::Error: Display,
120 D: Sink<u32> + Send + Unpin + 'static,
121 {
122 let token = token.unwrap_or_default();
123 let (input_tx, input_rx) = channel(config.snd_wnd.max(8) as usize);
124 let (output_tx, output_rx) = channel(config.rcv_wnd.max(16) as usize);
125
126 Self {
127 config: config.clone(),
128 conv: 0,
129 input_sink: PollSender::new(input_tx),
130 output_rx,
131 token: token.clone(),
132 task: Some(tokio::spawn(
133 Task {
134 kcp: Kcp::new(conv),
135 config,
136 input_rx,
137 token,
138 rx_buf: BytesMut::new(),
139 hs: Handshake::Syn,
140 flags: 0,
141 hs_end_time: 0,
142 last_io_time: 0,
143 session_id: Default::default(),
144 _phantom: Default::default(),
145 }
146 .run(output_tx, transport, disconnect),
147 )),
148 read_buf: None,
149 }
150 }
151
152 async fn wait_connection(mut self) -> io::Result<Self> {
153 let rst = match self.output_rx.recv().await {
154 Some(Output::Connected { conv }) => {
155 trace!("Connect conv {}", conv);
156 self.conv = conv;
157 return Ok(self);
158 }
159 _ => Err(io::ErrorKind::TimedOut.into()),
160 };
161 self.close().await.ok();
162 rst
163 }
164
165 fn try_close(&mut self) {
166 self.token.cancel();
167 self.output_rx.close();
168 }
169}
170
171impl Drop for KcpStream {
172 fn drop(&mut self) {
173 self.try_close();
174 }
175}
176
177impl Stream for KcpStream {
178 type Item = io::Result<Bytes>;
179
180 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181 if self.read_buf.is_some() {
182 return Poll::Ready(Some(Ok(self.read_buf.take().unwrap())));
183 }
184 while let Some(x) = ready!(self.output_rx.poll_recv(cx)) {
185 if let Output::Frame(frame) = x {
186 return Poll::Ready(Some(Ok(frame)));
187 }
188 }
189 Poll::Ready(None)
190 }
191}
192
193impl Sink<Bytes> for KcpStream {
194 type Error = io::Error;
195
196 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
197 self.get_mut()
198 .input_sink
199 .poll_ready_unpin(cx)
200 .map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
201 }
202
203 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
204 Poll::Ready(Ok(()))
205 }
206
207 fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
208 self.get_mut()
209 .input_sink
210 .start_send_unpin(item)
211 .map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
212 }
213
214 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
215 let this = self.get_mut();
216 let _ = ready!(this.input_sink.poll_close_unpin(cx));
217 this.try_close();
218 if let Some(task) = this.task.as_mut() {
219 let _ = ready!(task.poll_unpin(cx));
220 this.task.take();
221 }
222 Poll::Ready(Ok(()))
223 }
224}
225
226impl AsyncRead for KcpStream {
227 fn poll_read(
228 self: Pin<&mut Self>,
229 cx: &mut Context<'_>,
230 buf: &mut ReadBuf<'_>,
231 ) -> Poll<io::Result<()>> {
232 if buf.remaining() == 0 {
233 return Poll::Ready(Ok(()));
234 }
235
236 let this = self.get_mut();
237
238 while this.read_buf.is_none() {
239 match this.poll_next_unpin(cx) {
240 Poll::Ready(Some(Ok(chunk))) => {
241 if !chunk.is_empty() {
242 this.read_buf = Some(chunk);
243 }
244 }
245 Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
246 Poll::Ready(None) => return Poll::Ready(Ok(())),
247 Poll::Pending => return Poll::Pending,
248 }
249 }
250
251 if let Some(ref mut chunk) = this.read_buf {
252 let len = chunk.len().min(buf.remaining());
253 buf.put_slice(&chunk[..len]);
254 if len < chunk.len() {
255 let _ = chunk.split_to(len);
256 } else {
257 this.read_buf = None;
258 }
259 }
260
261 Poll::Ready(Ok(()))
262 }
263}
264
265impl AsyncWrite for KcpStream {
266 fn poll_write(
267 self: Pin<&mut Self>,
268 cx: &mut Context<'_>,
269 buf: &[u8],
270 ) -> Poll<Result<usize, io::Error>> {
271 if buf.is_empty() {
272 return Poll::Ready(Ok(0));
273 }
274
275 let this = self.get_mut();
276 let mut len = 0;
277
278 for chunk in buf.chunks(Kcp::max_frame_size(this.config.mtu) as usize) {
279 match this.poll_ready_unpin(cx) {
280 Poll::Ready(Ok(_)) => {
281 if let Err(e) = this.start_send_unpin(Bytes::copy_from_slice(chunk)) {
282 if len > 0 {
283 break;
284 }
285 return Poll::Ready(Err(e));
286 }
287 len += chunk.len();
288 }
289 Poll::Ready(Err(e)) => {
290 if len > 0 {
291 break;
292 }
293 return Poll::Ready(Err(e));
294 }
295 Poll::Pending => {
296 if len > 0 {
297 break;
298 }
299 return Poll::Pending;
300 }
301 }
302 }
303
304 Poll::Ready(Ok(len))
305 }
306
307 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
308 Poll::Ready(Ok(()))
309 }
310
311 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
312 self.poll_close(cx)
313 }
314}
315
316#[derive(Debug)]
319enum Output {
320 Connected { conv: u32 },
321 Frame(Bytes),
322}
323
324#[derive(Clone, Copy, PartialOrd, PartialEq, Eq, Debug)]
325enum Handshake {
326 Syn,
327 Connected,
328 FinPending,
329 FinSent,
330 FinWaitPeer,
331 Disconnected,
332}
333
334struct Task<T, Si> {
335 kcp: Kcp,
336 config: Arc<KcpConfig>,
337 input_rx: Receiver<Bytes>,
338 token: CancellationToken,
339 rx_buf: BytesMut,
340
341 hs: Handshake,
342 flags: u32,
343 hs_end_time: u32,
344 last_io_time: u32,
345 session_id: Vec<u8>,
346 _phantom: PhantomData<(T, Si)>,
347}
348
349impl<T, Si> Task<T, Si> {
350 const CLIENT: u32 = 0x01;
351 const FLUSH: u32 = 0x02;
352 const FIN_RECVED: u32 = 0x04;
353 const INPUT_CLOSED: u32 = 0x08;
354
355 const CMD_MASK: u8 = 0x57;
356 const KCP_SYN: u8 = 0x80;
357 const KCP_FIN: u8 = 0x20;
358 const KCP_RESET: u8 = 0x08;
359
360 #[inline]
361 fn is_client(&self) -> bool {
362 self.flags & Self::CLIENT != 0
363 }
364}
365
366impl<T, Si> std::fmt::Display for Task<T, Si> {
367 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368 write!(
369 f,
370 "KcpStream({}-{} {:?})",
371 self.kcp.conv(),
372 self.is_client() as i32,
373 self.hs
374 )
375 }
376}
377
378impl<T, Si> Task<T, Si>
379where
380 T: Sink<Si> + Stream<Item = BytesMut> + Send + Unpin + 'static,
381 Si: From<BytesMut> + Send + Unpin + 'static,
382 <T as Sink<Si>>::Error: Display,
383{
384 async fn run<D>(mut self, output_tx: Sender<Output>, mut transport: T, mut disconnect: D)
385 where
386 D: Sink<u32> + Send + Unpin,
387 {
388 self.kcp.initialize();
389 self.kcp_apply_config();
390
391 self.hs = Handshake::Syn;
393 self.hs_end_time =
394 self.kcp.get_system_time() + self.config.connect_timeout.as_secs() as u32 * 1000;
395 self.kcp.set_nodelay(true, 100, 0, false);
396 self.last_io_time = self.kcp.current();
397 if self.kcp.conv() == Kcp::SYN_CONV {
398 self.flags |= Self::CLIENT;
399 self.session_id = self.config.random_session_id();
401 self.kcp.set_conv(Kcp::SYN_CONV);
403 self.handshake_send();
404 }
405
406 let mut tx_frame: Option<Bytes> = None;
407 loop {
408 if self.kcp.has_ouput() {
409 select! {
410 biased;
411 x = Self::kcp_output(&mut self.kcp, &mut transport, self.hs) => {
412 if x.is_err() { break; }
413 }
414 _ = self.token.cancelled() => break,
415 }
416 }
417
418 if self.hs == Handshake::Disconnected {
419 break;
420 }
421
422 if self.kcp.is_dead_link()
423 || self.kcp.duration_since(self.last_io_time)
424 >= (self.config.session_expire.as_secs() * 1000) as u32
425 {
426 break;
428 }
429
430 let current = self.kcp.get_system_time();
431 let mut interval = self.kcp.check(current).wrapping_sub(current).min(60000);
432 if self.hs != Handshake::Connected {
433 let timeout = self.hs_end_time.wrapping_sub(current);
434 if timeout as i32 > 0 {
435 interval = interval.min(timeout);
436 } else {
437 self.set_handshake(Handshake::Disconnected);
439 continue;
440 }
441 }
442 if interval == 0 {
443 self.kcp.update(current);
444 continue;
445 }
446
447 select! {
448 x = self.input_rx.recv(), if !self.has(Self::INPUT_CLOSED)
449 && !self.kcp.is_send_queue_full() => {
450 let mut closed = false;
451 match x {
452 Some(frame) => {
453 self.process_input(frame);
454 while !self.kcp.is_send_queue_full() {
456 match self.input_rx.try_recv() {
457 Ok(frame) => self.process_input(frame),
458 Err(TryRecvError::Empty) => break,
459 Err(TryRecvError::Disconnected) => {
460 closed = true;
461 break;
462 }
463 }
464 }
465 }
466 _ => closed = true,
467 }
468 if closed {
469 debug!("{} input channel has been closed, start FIN handshake", &self);
470 self.flags |= Self::INPUT_CLOSED;
471 self.fin_transit_state();
472 }
473 self.kcp_flush();
475 }
476
477 x = output_tx.reserve(), if tx_frame.is_some() => {
478 match x {
479 Ok(permit) => {
480 if let Some(frame) = tx_frame.take() {
481 permit.send(Output::Frame(frame));
482 }
483 while let Some(frame) = self.kcp_recv() {
484 match output_tx.try_reserve() {
485 Ok(permit) => permit.send(Output::Frame(frame)),
486 _ => {
487 tx_frame = Some(frame);
489 break;
490 }
491 }
492 }
493 }
494 _ => break,
495 }
496 }
497
498 x = transport.next() => {
499 match x {
500 Some(packet) => {
501 self.kcp_input(packet);
502 poll_fn(|cx| {
504 for _ in 1..self.config.rcv_wnd {
505 match transport.poll_next_unpin(cx) {
506 Poll::Ready(Some(packet)) => self.kcp_input(packet),
507 _ => break,
508 }
509 }
510 Poll::Ready(())
511 }).await;
512
513 if self.hs == Handshake::Syn {
514 if self.syn_handshake_recv(&output_tx).is_err() {
515 self.set_handshake(Handshake::Disconnected);
516 }
517 } else if self.is_fin_handshake() {
518 self.fin_transit_state();
520 }
521
522 if tx_frame.is_none() {
524 tx_frame = self.kcp_recv();
525 }
526 self.kcp_flush();
528 }
529 _ => break,
530 }
531 }
532
533 _ = sleep(Duration::from_millis(interval as u64)) => (),
534 _ = self.token.cancelled() => break,
535 }
536 }
537
538 debug!("{} break loop", &self);
539
540 tx_frame.take();
541 drop(output_tx);
542
543 self.input_rx.close();
545 while self.input_rx.recv().await.is_some() {}
546
547 let conv = self.kcp.conv();
548 if Kcp::is_valid_conv(conv) {
549 let mut buf = BytesMut::new();
550 self.kcp.write_ack_head(&mut buf, Self::KCP_RESET, 0);
551 let half_close_timeout = self.config.half_close_timeout;
552 drop(self);
554 HalfClosePool::create_task(transport, buf, half_close_timeout).await;
555 }
556
557 disconnect.send(conv).await.ok();
559 }
560
561 #[inline]
562 fn has(&self, state: u32) -> bool {
563 self.flags & state != 0
564 }
565
566 fn handshake_send(&mut self) {
567 self.flags |= Self::FLUSH;
568 let mut syn = Vec::<u8>::new();
569 syn.put_slice(&self.config.session_key);
570 syn.put_slice(&self.session_id);
571 self.kcp.send(syn.as_slice()).unwrap();
572 self.kcp_flush();
573 }
574
575 fn syn_handshake_recv(&mut self, output_tx: &Sender<Output>) -> Result<(), ()> {
576 let syn = match self.kcp_recv() {
578 Some(x) => x,
579 _ => return Ok(()),
580 };
581 if let Some(session_id) = syn
583 .strip_prefix(self.config.session_key.deref())
584 .and_then(|x| {
585 if x.len() == self.config.session_id_len {
586 Some(x)
587 } else {
588 None
589 }
590 })
591 {
592 if self.is_client() {
594 if self.session_id != session_id {
596 return Err(());
597 }
598 self.kcp_apply_nodelay();
599 } else {
600 self.session_id = session_id.to_vec();
601 self.kcp_apply_nodelay();
602 self.handshake_send();
603 }
604
605 self.set_handshake(Handshake::Connected);
607 return output_tx
608 .try_send(Output::Connected {
609 conv: self.kcp.conv(),
610 })
611 .map_err(|_| ());
612 }
613 Err(())
614 }
615
616 fn syn_handshake_input(&mut self, packet: &[u8]) -> io::Result<()> {
617 if let Some(conv) = Kcp::read_conv(packet) {
619 if self.is_client() {
620 if self.hs == Handshake::Syn && conv != Kcp::SYN_CONV {
621 self.kcp.set_conv(conv);
624 if self.kcp.input(packet).is_err() || self.kcp.get_waitsnd() > 0 {
625 self.kcp.set_conv(Kcp::SYN_CONV);
627 }
628 }
629 return Ok(());
630 } else if conv == Kcp::SYN_CONV {
631 let mine = self.kcp.conv();
633 self.kcp.set_conv(conv);
635 self.kcp.input(packet).ok();
636 self.kcp.set_conv(mine);
638 return Ok(());
639 }
640 }
641 Err(io::ErrorKind::InvalidInput.into())
642 }
643
644 #[inline]
645 fn is_fin_handshake(&self) -> bool {
646 (Handshake::FinPending..Handshake::FinWaitPeer).contains(&self.hs)
647 }
648
649 fn set_handshake(&mut self, hs: Handshake) {
650 if self.hs != hs {
651 debug!("{} -> {:?}", self, hs);
652 self.hs = hs;
653 }
654 }
655
656 fn fin_transit_state(&mut self) {
657 loop {
658 let state = match self.hs {
659 Handshake::Syn => Handshake::Disconnected,
660 Handshake::Connected => {
661 self.hs_end_time = self.kcp.get_system_time()
662 + self.config.shutdown_timeout.as_secs() as u32 * 1000;
663 Handshake::FinPending
664 }
665 Handshake::FinPending => {
666 debug!(
667 "{} input closed: {}, waitsnd: {}",
668 &self,
669 self.has(Self::INPUT_CLOSED),
670 self.kcp.get_waitsnd()
671 );
672 if !self.has(Self::INPUT_CLOSED) || self.kcp.get_waitsnd() > 0 {
673 break;
674 }
675 self.handshake_send();
676 Handshake::FinSent
677 }
678 Handshake::FinSent => {
679 debug!("{} waitsnd: {}", self, self.kcp.get_waitsnd());
680 if self.kcp.get_waitsnd() > 0 {
681 break;
682 }
683 Handshake::FinWaitPeer
684 }
685 Handshake::FinWaitPeer => {
686 debug!(
687 "{} waitsnd: {} + {}",
688 self,
689 self.kcp.nrcv_que(),
690 self.kcp.nrcv_buf(),
691 );
692 if !self.has(Self::FIN_RECVED) || self.kcp.nrcv_que() + self.kcp.nrcv_buf() > 0
694 {
695 break;
696 }
697 Handshake::Disconnected
698 }
699 Handshake::Disconnected => break,
700 };
701 self.set_handshake(state);
702 }
703 }
704
705 fn process_input(&mut self, frame: Bytes) {
706 match self.kcp.send(frame.deref()) {
707 Ok(_) => {
708 if self.config.nodelay.nodelay || self.kcp.nsnd_que() > 1 {
710 self.flags |= Self::FLUSH;
711 }
712 }
713 _ => error!(
714 "Too big frame size: {} > {}",
715 frame.len(),
716 Kcp::max_frame_size(self.config.mtu)
717 ),
718 }
719 }
720
721 fn kcp_apply_config(&mut self) {
722 self.kcp.set_stream(self.config.stream);
723 self.kcp.set_mtu(self.config.mtu).unwrap();
724 self.kcp
725 .set_wndsize(self.config.snd_wnd, self.config.rcv_wnd);
726 self.kcp_apply_nodelay();
727
728 let size = self.config.mtu as usize * 3;
730 if self.rx_buf.len() < size {
731 self.rx_buf.resize(size, 0);
732 }
733 }
734
735 fn kcp_apply_nodelay(&mut self) {
736 self.kcp.set_nodelay(
737 self.config.nodelay.nodelay,
738 self.config.nodelay.interval,
739 self.config.nodelay.resend,
740 self.config.nodelay.nc,
741 );
742 }
743
744 fn kcp_recv(&mut self) -> Option<Bytes> {
745 if self.has(Self::FIN_RECVED) && self.kcp.nrcv_que() == 1 && self.kcp.nrcv_buf() == 0 {
747 if let Some(fin) = self.kcp.recv_bytes() {
749 if Some(&self.session_id[..]) == fin.strip_prefix(self.config.session_key.deref()) {
750 debug!("{} receive FIN frame", self);
751 }
752 }
753 None
754 } else {
755 self.kcp.recv_bytes()
756 }
757 }
758
759 fn kcp_flush(&mut self) {
760 if self.has(Self::FLUSH) {
761 self.flags ^= Self::FLUSH;
762 self.kcp.update(self.kcp.get_system_time());
763 self.kcp.flush();
764 self.last_io_time = self.kcp.current();
765 }
766 }
767
768 fn kcp_input(&mut self, mut packet: BytesMut) {
769 match self.kcp.input(&packet) {
770 Ok(_) => self.flags |= Self::FLUSH,
771 Err(e) => match e.kind() {
772 io::ErrorKind::NotFound => {
773 if self.syn_handshake_input(&packet).is_ok() {
775 return;
776 }
777 trace!(
778 "{} conv does not match: {}",
779 self,
780 Kcp::read_conv(&self.rx_buf).unwrap_or(0),
781 );
782 }
783 io::ErrorKind::InvalidData => {
784 let cmd = Kcp::read_cmd(&packet);
785 if cmd & Self::KCP_RESET != 0 {
786 debug!("{} receive RESET", self);
787 self.set_handshake(Handshake::Disconnected);
788 return;
789 }
790 if cmd & Self::KCP_FIN != 0 {
791 if !self.has(Self::FIN_RECVED) {
792 self.flags |= Self::FIN_RECVED;
793 self.input_rx.close();
795 }
796 self.fin_transit_state();
797 Kcp::write_cmd(&mut packet, cmd ^ Self::KCP_FIN);
798 if self.kcp.input(&packet).is_ok() {
799 return;
800 }
801 }
802 trace!("packet parse error");
803 }
804 _ => unreachable!(),
805 },
806 }
807 }
808
809 async fn kcp_output(kcp: &mut Kcp, sink: &mut T, hs: Handshake) -> Result<(), ()> {
810 while let Some(mut packet) = kcp.pop_output() {
811 if hs == Handshake::FinSent {
812 if Kcp::read_payload_data(&packet).is_some() {
814 let cmd = Kcp::read_cmd(&packet) | Self::KCP_FIN;
815 Kcp::write_cmd(&mut packet, cmd);
816 }
817 }
818
819 if let Err(e) = sink.feed(packet.into()).await {
820 while kcp.pop_output().is_some() {}
822 warn!("send to sink: {}", e);
823 break;
824 }
825 }
826 sink.flush().await.map_err(|_| ())?;
827 Ok(())
828 }
829}