kcp/transport/
mpsc.rs

1use super::duplex::DuplexStream;
2use futures::Sink;
3use pin_project::pin_project;
4use std::{
5    io,
6    pin::Pin,
7    task::{Context, Poll},
8};
9use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender};
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_util::sync::PollSender;
12
13pub type MpscStream<S, R> = DuplexStream<PollSender<S>, S, ReceiverStream<R>>;
14
15pub fn tokio_mpsc_stream<S, R>(sender: Sender<S>, receiver: Receiver<R>) -> MpscStream<S, R>
16where
17    S: Send + 'static,
18{
19    DuplexStream::new(PollSender::new(sender), ReceiverStream::new(receiver))
20}
21
22#[pin_project]
23pub struct UnboundedSink<T> {
24    #[pin]
25    sender: Option<UnboundedSender<T>>,
26}
27
28impl<T> UnboundedSink<T> {
29    pub fn new(sender: UnboundedSender<T>) -> Self {
30        Self {
31            sender: Some(sender),
32        }
33    }
34
35    crate::future_delegate_access_inner!(sender, Option<UnboundedSender<T>>, ());
36}
37
38impl<T> Sink<T> for UnboundedSink<T> {
39    type Error = io::Error;
40
41    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42        if self.sender.is_some() {
43            Poll::Ready(Ok(()))
44        } else {
45            Poll::Ready(Err(io::ErrorKind::NotConnected.into()))
46        }
47    }
48
49    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
50        if self.sender.is_some() {
51            Poll::Ready(Ok(()))
52        } else {
53            Poll::Ready(Err(io::ErrorKind::NotConnected.into()))
54        }
55    }
56
57    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
58        if let Some(sender) = &self.sender {
59            if sender.send(item).is_ok() {
60                return Ok(());
61            }
62        }
63        Err(io::ErrorKind::NotConnected.into())
64    }
65
66    fn poll_close(
67        mut self: Pin<&mut Self>,
68        _cx: &mut Context<'_>,
69    ) -> Poll<Result<(), Self::Error>> {
70        self.sender.take();
71        Poll::Ready(Ok(()))
72    }
73}