1use super::duplex::DuplexStream;
2use futures::Sink;
3use pin_project::pin_project;
4use std::{
5 io,
6 pin::Pin,
7 task::{Context, Poll},
8};
9use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender};
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_util::sync::PollSender;
12
13pub type MpscStream<S, R> = DuplexStream<PollSender<S>, S, ReceiverStream<R>>;
14
15pub fn tokio_mpsc_stream<S, R>(sender: Sender<S>, receiver: Receiver<R>) -> MpscStream<S, R>
16where
17 S: Send + 'static,
18{
19 DuplexStream::new(PollSender::new(sender), ReceiverStream::new(receiver))
20}
21
22#[pin_project]
23pub struct UnboundedSink<T> {
24 #[pin]
25 sender: Option<UnboundedSender<T>>,
26}
27
28impl<T> UnboundedSink<T> {
29 pub fn new(sender: UnboundedSender<T>) -> Self {
30 Self {
31 sender: Some(sender),
32 }
33 }
34
35 crate::future_delegate_access_inner!(sender, Option<UnboundedSender<T>>, ());
36}
37
38impl<T> Sink<T> for UnboundedSink<T> {
39 type Error = io::Error;
40
41 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42 if self.sender.is_some() {
43 Poll::Ready(Ok(()))
44 } else {
45 Poll::Ready(Err(io::ErrorKind::NotConnected.into()))
46 }
47 }
48
49 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
50 if self.sender.is_some() {
51 Poll::Ready(Ok(()))
52 } else {
53 Poll::Ready(Err(io::ErrorKind::NotConnected.into()))
54 }
55 }
56
57 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
58 if let Some(sender) = &self.sender {
59 if sender.send(item).is_ok() {
60 return Ok(());
61 }
62 }
63 Err(io::ErrorKind::NotConnected.into())
64 }
65
66 fn poll_close(
67 mut self: Pin<&mut Self>,
68 _cx: &mut Context<'_>,
69 ) -> Poll<Result<(), Self::Error>> {
70 self.sender.take();
71 Poll::Ready(Ok(()))
72 }
73}