remoc/rch/mpsc/
sender.rs

1use futures::{FutureExt, Sink};
2use serde::{Deserialize, Serialize};
3use std::{
4    convert::TryFrom,
5    error::Error,
6    fmt,
7    marker::PhantomData,
8    pin::Pin,
9    sync::{Arc, Weak},
10    task::{Context, Poll, ready},
11};
12use tokio_util::sync::ReusableBoxFuture;
13
14use super::{
15    super::{
16        ClosedReason, DEFAULT_BUFFER, DEFAULT_MAX_ITEM_SIZE, RemoteSendError, SendErrorExt, Sending,
17        base::{self, PortDeserializer, PortSerializer},
18    },
19    SendReq,
20    receiver::RecvError,
21    send_req,
22};
23use crate::{RemoteSend, chmux, codec, exec, rch::SendingError};
24
25/// An error occurred during sending over an mpsc channel.
26#[derive(Clone, Debug, Serialize, Deserialize)]
27pub enum SendError<T> {
28    /// The remote end closed the channel.
29    Closed(T),
30    /// Sending to a remote endpoint failed.
31    RemoteSend(base::SendErrorKind),
32    /// Connecting a sent channel failed.
33    RemoteConnect(chmux::ConnectError),
34    /// Listening for a received channel failed.
35    RemoteListen(chmux::ListenerError),
36    /// Forwarding at a remote endpoint to another remote endpoint failed.
37    RemoteForward,
38}
39
40impl<T> SendError<T> {
41    /// True, if the remote endpoint closed the channel.
42    pub fn is_closed(&self) -> bool {
43        matches!(self, Self::Closed(_))
44    }
45
46    /// Returns the reason for why the channel has been disconnected.
47    ///
48    /// Returns [None] if the error is not due to the channel being disconnected.
49    /// Currently this can only happen if a serialization error occurred.
50    pub fn closed_reason(&self) -> Option<ClosedReason> {
51        match self {
52            Self::RemoteSend(base::SendErrorKind::Serialize(_)) => None,
53            Self::RemoteSend(base::SendErrorKind::Send(chmux::SendError::Closed { .. })) => {
54                Some(ClosedReason::Dropped)
55            }
56            Self::Closed(_) => Some(ClosedReason::Closed),
57            _ => Some(ClosedReason::Failed),
58        }
59    }
60
61    /// True, if the remote endpoint closed the channel, was dropped or the connection failed.
62    pub fn is_disconnected(&self) -> bool {
63        !matches!(self, Self::RemoteSend(base::SendErrorKind::Serialize(_)))
64    }
65
66    /// Returns whether the error is final, i.e. no further send operation can succeed.
67    #[deprecated = "a remoc::rch::mpsc::SendError is always final"]
68    pub fn is_final(&self) -> bool {
69        true
70    }
71
72    /// Whether the error is caused by the item to be sent.
73    pub fn is_item_specific(&self) -> bool {
74        matches!(self, Self::RemoteSend(err) if err.is_item_specific())
75    }
76
77    /// Returns the error without the contained item.
78    pub fn without_item(self) -> SendError<()> {
79        match self {
80            Self::Closed(_) => SendError::Closed(()),
81            Self::RemoteSend(err) => SendError::RemoteSend(err),
82            Self::RemoteConnect(err) => SendError::RemoteConnect(err),
83            Self::RemoteListen(err) => SendError::RemoteListen(err),
84            Self::RemoteForward => SendError::RemoteForward,
85        }
86    }
87}
88
89impl<T> SendErrorExt for SendError<T> {
90    fn is_closed(&self) -> bool {
91        self.is_closed()
92    }
93
94    fn is_disconnected(&self) -> bool {
95        self.is_disconnected()
96    }
97
98    fn is_final(&self) -> bool {
99        #[allow(deprecated)]
100        self.is_final()
101    }
102
103    fn is_item_specific(&self) -> bool {
104        self.is_item_specific()
105    }
106}
107
108impl<T> fmt::Display for SendError<T> {
109    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
110        match self {
111            Self::Closed(_) => write!(f, "channel is closed"),
112            Self::RemoteSend(err) => write!(f, "send error: {err}"),
113            Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
114            Self::RemoteListen(err) => write!(f, "listen error: {err}"),
115            Self::RemoteForward => write!(f, "forwarding error"),
116        }
117    }
118}
119
120impl<T> Error for SendError<T> where T: fmt::Debug {}
121
122impl<T> SendError<T> {
123    fn from_remote_send_error(err: RemoteSendError, value: T) -> Self {
124        match err {
125            RemoteSendError::Send(err) => Self::RemoteSend(err),
126            RemoteSendError::Connect(err) => Self::RemoteConnect(err),
127            RemoteSendError::Listen(err) => Self::RemoteListen(err),
128            RemoteSendError::Forward => Self::RemoteForward,
129            RemoteSendError::Closed => Self::Closed(value),
130        }
131    }
132}
133
134/// An error occurred during trying to send over an mpsc channel.
135#[derive(Clone, Debug, Serialize, Deserialize)]
136pub enum TrySendError<T> {
137    /// The remote end closed the channel.
138    Closed(T),
139    /// The data could not be sent on the channel because the channel
140    /// is currently full and sending would require blocking.
141    Full(T),
142    /// Sending to a remote endpoint failed.
143    RemoteSend(base::SendErrorKind),
144    /// Connecting a sent channel failed.
145    RemoteConnect(chmux::ConnectError),
146    /// Listening for a received channel failed.
147    RemoteListen(chmux::ListenerError),
148    /// Forwarding at a remote endpoint to another remote endpoint failed.
149    RemoteForward,
150}
151
152impl<T> TrySendError<T> {
153    /// True, if the remote endpoint closed the channel.
154    pub fn is_closed(&self) -> bool {
155        matches!(self, Self::Closed(_))
156    }
157
158    /// True, if the remote endpoint closed the channel, was dropped or the connection failed.
159    pub fn is_disconnected(&self) -> bool {
160        !matches!(self, Self::RemoteSend(base::SendErrorKind::Serialize(_)) | Self::Full(_))
161    }
162
163    /// Returns whether the error is final, i.e. no further send operation can succeed.
164    pub fn is_final(&self) -> bool {
165        !matches!(self, Self::Full(_))
166    }
167
168    /// Whether the error is caused by the item to be sent.
169    pub fn is_item_specific(&self) -> bool {
170        matches!(self, Self::RemoteSend(err) if err.is_item_specific())
171    }
172}
173
174impl<T> SendErrorExt for TrySendError<T> {
175    fn is_closed(&self) -> bool {
176        self.is_closed()
177    }
178
179    fn is_disconnected(&self) -> bool {
180        self.is_disconnected()
181    }
182
183    fn is_final(&self) -> bool {
184        self.is_final()
185    }
186
187    fn is_item_specific(&self) -> bool {
188        self.is_item_specific()
189    }
190}
191
192impl<T> fmt::Display for TrySendError<T> {
193    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
194        match self {
195            Self::Closed(_) => write!(f, "channel is closed"),
196            Self::Full(_) => write!(f, "channel is full"),
197            Self::RemoteSend(err) => write!(f, "send error: {err}"),
198            Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
199            Self::RemoteListen(err) => write!(f, "listen error: {err}"),
200            Self::RemoteForward => write!(f, "forwarding error"),
201        }
202    }
203}
204
205impl<T> TrySendError<T> {
206    fn from_remote_send_error(err: RemoteSendError, value: T) -> Self {
207        match err {
208            RemoteSendError::Send(err) => Self::RemoteSend(err),
209            RemoteSendError::Connect(err) => Self::RemoteConnect(err),
210            RemoteSendError::Listen(err) => Self::RemoteListen(err),
211            RemoteSendError::Forward => Self::RemoteForward,
212            RemoteSendError::Closed => Self::Closed(value),
213        }
214    }
215}
216
217impl<T> From<SendError<T>> for TrySendError<T> {
218    fn from(err: SendError<T>) -> Self {
219        match err {
220            SendError::Closed(v) => Self::Closed(v),
221            SendError::RemoteSend(err) => Self::RemoteSend(err),
222            SendError::RemoteConnect(err) => Self::RemoteConnect(err),
223            SendError::RemoteListen(err) => Self::RemoteListen(err),
224            SendError::RemoteForward => Self::RemoteForward,
225        }
226    }
227}
228
229impl<T> TryFrom<TrySendError<T>> for SendError<T> {
230    type Error = TrySendError<T>;
231
232    fn try_from(err: TrySendError<T>) -> Result<Self, Self::Error> {
233        match err {
234            TrySendError::Closed(v) => Ok(Self::Closed(v)),
235            TrySendError::RemoteSend(err) => Ok(Self::RemoteSend(err)),
236            TrySendError::RemoteConnect(err) => Ok(Self::RemoteConnect(err)),
237            TrySendError::RemoteForward => Ok(Self::RemoteForward),
238            other => Err(other),
239        }
240    }
241}
242
243impl<T> Error for TrySendError<T> where T: fmt::Debug {}
244
245/// Send values to the associated [Receiver](super::Receiver), which may be located on a remote endpoint.
246///
247/// Instances are created by the [channel](super::channel) function.
248///
249/// This can be converted into a [Sink] accepting values by wrapping it into a [SenderSink].
250pub struct Sender<T, Codec = codec::Default, const BUFFER: usize = DEFAULT_BUFFER> {
251    tx: Weak<tokio::sync::mpsc::Sender<SendReq<T>>>,
252    closed_rx: tokio::sync::watch::Receiver<Option<ClosedReason>>,
253    remote_send_err_rx: tokio::sync::watch::Receiver<Option<RemoteSendError>>,
254    dropped_tx: tokio::sync::mpsc::Sender<()>,
255    max_item_size: usize,
256    _codec: PhantomData<Codec>,
257}
258
259impl<T, Codec, const BUFFER: usize> fmt::Debug for Sender<T, Codec, BUFFER> {
260    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
261        f.debug_struct("Sender").finish()
262    }
263}
264
265impl<T, Codec, const BUFFER: usize> Clone for Sender<T, Codec, BUFFER> {
266    fn clone(&self) -> Self {
267        Self {
268            tx: self.tx.clone(),
269            closed_rx: self.closed_rx.clone(),
270            remote_send_err_rx: self.remote_send_err_rx.clone(),
271            dropped_tx: self.dropped_tx.clone(),
272            max_item_size: self.max_item_size,
273            _codec: PhantomData,
274        }
275    }
276}
277
278/// Mpsc sender in transport.
279#[derive(Serialize, Deserialize)]
280pub(crate) struct TransportedSender<T, Codec> {
281    /// chmux port number. `None` if closed.
282    port: Option<u32>,
283    /// Data type.
284    data: PhantomData<T>,
285    /// Data codec.
286    codec: PhantomData<Codec>,
287    /// Maximum item size in bytes.
288    #[serde(default = "default_max_item_size")]
289    max_item_size: u64,
290}
291
292const fn default_max_item_size() -> u64 {
293    u64::MAX
294}
295
296impl<T, Codec, const BUFFER: usize> Sender<T, Codec, BUFFER>
297where
298    T: Send + 'static,
299{
300    /// Creates a new sender.
301    pub(crate) fn new(
302        tx: tokio::sync::mpsc::Sender<SendReq<T>>,
303        mut closed_rx: tokio::sync::watch::Receiver<Option<ClosedReason>>,
304        remote_send_err_rx: tokio::sync::watch::Receiver<Option<RemoteSendError>>,
305    ) -> Self {
306        let tx = Arc::new(tx);
307        let (dropped_tx, mut dropped_rx) = tokio::sync::mpsc::channel(1);
308
309        let this = Self {
310            tx: Arc::downgrade(&tx),
311            closed_rx: closed_rx.clone(),
312            remote_send_err_rx,
313            dropped_tx,
314            max_item_size: DEFAULT_MAX_ITEM_SIZE,
315            _codec: PhantomData,
316        };
317
318        // Drop strong reference to sender when channel is closed.
319        exec::spawn(async move {
320            loop {
321                tokio::select! {
322                    res = closed_rx.changed() => {
323                        match res {
324                            Ok(()) if closed_rx.borrow().is_some() => break,
325                            Ok(()) => (),
326                            Err(_) => break,
327                        }
328                    },
329                    _ = dropped_rx.recv() => break,
330                }
331            }
332
333            drop(tx);
334        });
335
336        this
337    }
338
339    /// Creates a new sender that is closed.
340    pub(crate) fn new_closed() -> Self {
341        Self {
342            tx: Weak::new(),
343            closed_rx: tokio::sync::watch::channel(Some(ClosedReason::Closed)).1,
344            remote_send_err_rx: tokio::sync::watch::channel(None).1,
345            dropped_tx: tokio::sync::mpsc::channel(1).0,
346            max_item_size: DEFAULT_MAX_ITEM_SIZE,
347            _codec: PhantomData,
348        }
349    }
350
351    /// Sends a value over this channel.
352    ///
353    /// # Error reporting
354    /// Sending and error reporting are done asynchronously.
355    /// Thus, the reporting of an error may be delayed and this function may
356    /// return errors caused by previous invocations.
357    pub async fn send(&self, value: T) -> Result<Sending<T>, SendError<T>> {
358        if let Some(err) = self.remote_send_err_rx.borrow().as_ref() {
359            return Err(SendError::from_remote_send_error(err.clone(), value));
360        }
361
362        match self.tx.upgrade() {
363            Some(tx) => {
364                let (req, sent) = send_req(Ok(value));
365                match tx.send(req).await {
366                    Ok(()) => Ok(sent),
367                    Err(err) => Err(SendError::Closed(err.0.value.expect("unreachable"))),
368                }
369            }
370            None => Err(SendError::Closed(value)),
371        }
372    }
373
374    /// Attempts to immediately send a message over this channel.
375    ///
376    /// # Error reporting
377    /// Sending and error reporting are done asynchronously.
378    /// Thus, the reporting of an error may be delayed and this function may
379    /// return errors caused by previous invocations.
380    pub fn try_send(&self, value: T) -> Result<Sending<T>, TrySendError<T>> {
381        if let Some(err) = self.remote_send_err_rx.borrow().as_ref() {
382            return Err(TrySendError::from_remote_send_error(err.clone(), value));
383        }
384
385        match self.tx.upgrade() {
386            Some(tx) => {
387                let (req, sent) = send_req(Ok(value));
388                match tx.try_send(req) {
389                    Ok(()) => Ok(sent),
390                    Err(tokio::sync::mpsc::error::TrySendError::Full(err)) => {
391                        Err(TrySendError::Full(err.value.expect("unreachable")))
392                    }
393                    Err(tokio::sync::mpsc::error::TrySendError::Closed(err)) => {
394                        Err(TrySendError::Closed(err.value.expect("unreachable")))
395                    }
396                }
397            }
398            None => Err(TrySendError::Closed(value)),
399        }
400    }
401
402    /// Blocking send to call outside of asynchronous contexts.
403    ///
404    /// # Error reporting
405    /// Sending and error reporting are done asynchronously.
406    /// Thus, the reporting of an error may be delayed and this function may
407    /// return errors caused by previous invocations.
408    ///
409    /// # Panics
410    /// This function panics if called within an asynchronous execution context.
411    pub fn blocking_send(&self, value: T) -> Result<Sending<T>, SendError<T>> {
412        exec::task::block_on(self.send(value))
413    }
414
415    /// Wait for channel capacity, returning an owned permit.
416    /// Once capacity to send one message is available, it is reserved for the caller.
417    ///
418    /// # Error reporting
419    /// Sending and error reporting are done asynchronously.
420    /// Thus, the reporting of an error may be delayed and this function may
421    /// return errors caused by previous invocations.
422    pub async fn reserve(&self) -> Result<Permit<T>, SendError<()>> {
423        if let Some(err) = self.remote_send_err_rx.borrow().as_ref() {
424            return Err(SendError::from_remote_send_error(err.clone(), ()));
425        }
426
427        match self.tx.upgrade() {
428            Some(tx) => {
429                let tx = (*tx).clone();
430                match tx.reserve_owned().await {
431                    Ok(permit) => Ok(Permit(permit)),
432                    Err(_) => Err(SendError::Closed(())),
433                }
434            }
435            _ => Err(SendError::Closed(())),
436        }
437    }
438
439    /// Returns the current capacity of the channel.
440    ///
441    /// Zero is returned when the channel has been closed or an error has occurred.
442    pub fn capacity(&self) -> usize {
443        match self.tx.upgrade() {
444            Some(tx) => tx.capacity(),
445            None => 0,
446        }
447    }
448
449    /// Completes when the receiver has been closed, dropped or the connection failed.
450    ///
451    /// Use [closed_reason](Self::closed_reason) to obtain the cause for closure.
452    pub async fn closed(&self) {
453        let mut closed = self.closed_rx.clone();
454        while closed.borrow().is_none() {
455            if closed.changed().await.is_err() {
456                break;
457            }
458        }
459    }
460
461    /// Returns the reason for why the channel has been closed.
462    ///
463    /// Returns [None] if the channel is not closed.
464    pub fn closed_reason(&self) -> Option<ClosedReason> {
465        match (self.closed_rx.borrow().clone(), self.remote_send_err_rx.borrow().as_ref()) {
466            (Some(reason), _) => Some(reason),
467            (None, Some(_)) => Some(ClosedReason::Failed),
468            (None, None) => None,
469        }
470    }
471
472    /// Returns whether the receiver has been closed, dropped or the connection failed.
473    ///
474    /// Use [closed_reason](Self::closed_reason) to obtain the cause for closure.
475    pub fn is_closed(&self) -> bool {
476        self.closed_reason().is_some()
477    }
478
479    /// Sets the codec that will be used when sending this sender to a remote endpoint.
480    pub fn set_codec<NewCodec>(self) -> Sender<T, NewCodec, BUFFER> {
481        Sender {
482            tx: self.tx.clone(),
483            closed_rx: self.closed_rx.clone(),
484            remote_send_err_rx: self.remote_send_err_rx.clone(),
485            dropped_tx: self.dropped_tx.clone(),
486            max_item_size: self.max_item_size,
487            _codec: PhantomData,
488        }
489    }
490
491    /// Sets the buffer size that will be used when sending this sender to a remote endpoint.
492    pub fn set_buffer<const NEW_BUFFER: usize>(self) -> Sender<T, Codec, NEW_BUFFER> {
493        assert!(NEW_BUFFER > 0, "buffer size must not be zero");
494        Sender {
495            tx: self.tx.clone(),
496            closed_rx: self.closed_rx.clone(),
497            remote_send_err_rx: self.remote_send_err_rx.clone(),
498            dropped_tx: self.dropped_tx.clone(),
499            max_item_size: self.max_item_size,
500            _codec: PhantomData,
501        }
502    }
503
504    /// The maximum allowed item size in bytes.
505    pub fn max_item_size(&self) -> usize {
506        self.max_item_size
507    }
508
509    /// Sets the maximum allowed item size in bytes.
510    pub fn set_max_item_size(&mut self, max_item_size: usize) {
511        self.max_item_size = max_item_size;
512    }
513}
514
515/// Owned permit to send one value into the channel.
516pub struct Permit<T>(tokio::sync::mpsc::OwnedPermit<SendReq<T>>);
517
518impl<T> Permit<T>
519where
520    T: Send,
521{
522    /// Sends a value using the reserved capacity.
523    pub fn send(self, value: T) -> Sending<T> {
524        let (req, sent) = send_req(Ok(value));
525        self.0.send(req);
526        sent
527    }
528}
529
530impl<T, Codec, const BUFFER: usize> Drop for Sender<T, Codec, BUFFER> {
531    fn drop(&mut self) {
532        // empty
533    }
534}
535
536impl<T, Codec, const BUFFER: usize> Serialize for Sender<T, Codec, BUFFER>
537where
538    T: RemoteSend,
539    Codec: codec::Codec,
540{
541    /// Serializes this sender for sending over a chmux channel.
542    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
543    where
544        S: serde::Serializer,
545    {
546        let port = match self.tx.upgrade() {
547            // Channel is open.
548            Some(tx) => {
549                // Prepare channel for takeover.
550                let closed_rx = self.closed_rx.clone();
551                let remote_send_err_rx = self.remote_send_err_rx.clone();
552                let max_item_size = self.max_item_size;
553
554                Some(PortSerializer::connect(move |connect| {
555                    async move {
556                        // Establish chmux channel.
557                        let (raw_tx, raw_rx) = match connect.await {
558                            Ok(tx_rx) => tx_rx,
559                            Err(err) => {
560                                let _ = tx.send(SendReq::new(Err(RecvError::RemoteConnect(err)))).await;
561                                return;
562                            }
563                        };
564
565                        super::recv_impl::<T, Codec>(
566                            &tx,
567                            raw_tx,
568                            raw_rx,
569                            remote_send_err_rx,
570                            closed_rx,
571                            max_item_size,
572                        )
573                        .await;
574                    }
575                    .boxed()
576                })?)
577            }
578            None => {
579                // Channel is closed.
580                None
581            }
582        };
583
584        // Encode chmux port number in transport type and serialize it.
585        let transported = TransportedSender::<T, Codec> {
586            port,
587            data: PhantomData,
588            codec: PhantomData,
589            max_item_size: self.max_item_size.try_into().unwrap_or(u64::MAX),
590        };
591        transported.serialize(serializer)
592    }
593}
594
595impl<'de, T, Codec, const BUFFER: usize> Deserialize<'de> for Sender<T, Codec, BUFFER>
596where
597    T: RemoteSend,
598    Codec: codec::Codec,
599{
600    /// Deserializes this sender after it has been received over a chmux channel.
601    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
602    where
603        D: serde::Deserializer<'de>,
604    {
605        assert!(BUFFER > 0, "BUFFER must not be zero");
606
607        // Get chmux port number from deserialized transport type.
608        let TransportedSender { port, max_item_size, .. } =
609            TransportedSender::<T, Codec>::deserialize(deserializer)?;
610        let max_item_size = usize::try_from(max_item_size).unwrap_or(usize::MAX);
611
612        match port {
613            // Received channel is open.
614            Some(port) => {
615                // Create internal communication channels.
616                let (tx, rx) = tokio::sync::mpsc::channel(BUFFER);
617                let (closed_tx, closed_rx) = tokio::sync::watch::channel(None);
618                let (remote_send_err_tx, remote_send_err_rx) = tokio::sync::watch::channel(None);
619
620                // Accept chmux port request.
621                PortDeserializer::accept(port, move |local_port, request| {
622                    async move {
623                        // Accept chmux connection request.
624                        let (raw_tx, raw_rx) = match request.accept_from(local_port).await {
625                            Ok(tx_rx) => tx_rx,
626                            Err(err) => {
627                                let _ = remote_send_err_tx.send(Some(RemoteSendError::Listen(err)));
628                                return;
629                            }
630                        };
631
632                        super::send_impl::<T, Codec>(
633                            rx,
634                            raw_tx,
635                            raw_rx,
636                            remote_send_err_tx,
637                            closed_tx,
638                            max_item_size,
639                        )
640                        .await;
641                    }
642                    .boxed()
643                })?;
644
645                Ok(Self::new(tx, closed_rx, remote_send_err_rx))
646            }
647
648            // Received closed channel.
649            None => Ok(Self::new_closed()),
650        }
651    }
652}
653
654type ReserveRet<T, Codec, const BUFFER: usize> = (Result<Permit<T>, SendError<()>>, Sender<T, Codec, BUFFER>);
655
656/// A wrapper around an mpsc [Sender] that implements [Sink].
657pub struct SenderSink<T, Codec = codec::Default, const BUFFER: usize = DEFAULT_BUFFER> {
658    tx: Option<Sender<T, Codec, BUFFER>>,
659    permit: Option<Permit<T>>,
660    reserve: Option<ReusableBoxFuture<'static, ReserveRet<T, Codec, BUFFER>>>,
661    sending: Option<Sending<T>>,
662}
663
664impl<T, Codec, const BUFFER: usize> fmt::Debug for SenderSink<T, Codec, BUFFER> {
665    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
666        f.debug_struct("SenderSink").field("ready", &self.permit.is_some()).finish()
667    }
668}
669
670impl<T, Codec, const BUFFER: usize> SenderSink<T, Codec, BUFFER>
671where
672    T: Send + 'static,
673    Codec: codec::Codec,
674{
675    /// Wraps a [Sender] to provide a [Sink].
676    pub fn new(tx: Sender<T, Codec, BUFFER>) -> Self {
677        Self {
678            tx: Some(tx.clone()),
679            permit: None,
680            reserve: Some(ReusableBoxFuture::new(Self::make_reserve(tx))),
681            sending: None,
682        }
683    }
684
685    fn new_closed() -> Self {
686        Self { tx: None, permit: None, reserve: None, sending: None }
687    }
688
689    /// Gets a reference to the [Sender] of the underlying channel.
690    ///
691    /// `None` is returned if the sink has been closed.
692    pub fn get_ref(&self) -> Option<&Sender<T, Codec, BUFFER>> {
693        self.tx.as_ref()
694    }
695
696    async fn make_reserve(tx: Sender<T, Codec, BUFFER>) -> ReserveRet<T, Codec, BUFFER> {
697        let result = tx.reserve().await;
698        (result, tx)
699    }
700}
701
702impl<T, Codec, const BUFFER: usize> Clone for SenderSink<T, Codec, BUFFER>
703where
704    T: Send + 'static,
705    Codec: codec::Codec,
706{
707    fn clone(&self) -> Self {
708        match self.tx.clone() {
709            Some(tx) => Self::new(tx),
710            None => Self::new_closed(),
711        }
712    }
713}
714
715impl<T, Codec, const BUFFER: usize> Sink<T> for SenderSink<T, Codec, BUFFER>
716where
717    T: Send + 'static,
718    Codec: codec::Codec,
719{
720    type Error = SendError<()>;
721
722    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
723        if self.permit.is_some() {
724            return Poll::Ready(Ok(()));
725        }
726
727        let Some(reserve) = self.reserve.as_mut() else { return Poll::Ready(Err(SendError::Closed(()))) };
728        let (permit, tx) = ready!(reserve.poll(cx));
729        reserve.set(Self::make_reserve(tx));
730
731        self.permit = Some(permit?);
732
733        Poll::Ready(Ok(()))
734    }
735
736    fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
737        let permit = self.permit.take().expect("SenderSink is not ready for sending");
738        self.sending = Some(permit.send(item));
739        Ok(())
740    }
741
742    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
743        let Some(sending) = self.sending.as_mut() else { return Poll::Ready(Ok(())) };
744
745        let res = ready!(sending.poll_unpin(cx));
746        self.sending = None;
747
748        Poll::Ready(res.map_err(|err| match err {
749            SendingError::Send(base) => SendError::RemoteSend(base.kind),
750            SendingError::Dropped => SendError::Closed(()),
751        }))
752    }
753
754    fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
755        self.tx = None;
756        self.permit = None;
757        self.reserve = None;
758        Poll::Ready(Ok(()))
759    }
760}
761
762impl<T, Codec, const BUFFER: usize> From<Sender<T, Codec, BUFFER>> for SenderSink<T, Codec, BUFFER>
763where
764    T: Send + 'static,
765    Codec: codec::Codec,
766{
767    fn from(tx: Sender<T, Codec, BUFFER>) -> Self {
768        Self::new(tx)
769    }
770}