remoc/chmux/
sender.rs

1use bytes::Bytes;
2use futures::{
3    Future, FutureExt, future, ready,
4    sink::Sink,
5    task::{Context, Poll},
6};
7use std::{
8    error::Error,
9    fmt,
10    mem::size_of,
11    pin::Pin,
12    sync::{
13        Arc, Weak,
14        atomic::{AtomicBool, Ordering},
15    },
16};
17use tokio::sync::{Mutex, mpsc, oneshot};
18use tokio_util::sync::ReusableBoxFuture;
19
20use super::{
21    AnyStorage, Connect, ConnectError, PortAllocator, PortReq,
22    client::ConnectResponse,
23    credit::{AssignedCredits, CreditUser},
24    mux::PortEvt,
25};
26use crate::exec;
27
28/// An error occurred during sending of a message.
29#[derive(Debug, Clone)]
30#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
31pub enum SendError {
32    /// The multiplexer terminated.
33    ChMux,
34    /// Other side closed receiving end of channel.
35    Closed {
36        /// True, if remote endpoint still processes messages that were already sent.
37        gracefully: bool,
38    },
39}
40
41impl SendError {
42    /// Returns true, if error it due to channel being closed.
43    pub fn is_closed(&self) -> bool {
44        matches!(self, Self::Closed { gracefully: true })
45    }
46
47    /// True, if the remote endpoint closed the channel, was dropped or the connection failed.
48    #[deprecated = "a chmux::SendError is always due to disconnection"]
49    pub fn is_disconnected(&self) -> bool {
50        true
51    }
52
53    /// Returns whether the error is final, i.e. no further send operation can succeed.
54    #[deprecated = "a remoc::chmux::SendError is always final"]
55    pub fn is_final(&self) -> bool {
56        true
57    }
58}
59
60impl fmt::Display for SendError {
61    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
62        match self {
63            Self::ChMux => write!(f, "multiplexer terminated"),
64            Self::Closed { gracefully } => write!(
65                f,
66                "remote endpoint closed channel{}",
67                if *gracefully { " but still processes sent messages" } else { "" }
68            ),
69        }
70    }
71}
72
73impl Error for SendError {}
74
75impl<T> From<mpsc::error::SendError<T>> for SendError {
76    fn from(_err: mpsc::error::SendError<T>) -> Self {
77        Self::ChMux
78    }
79}
80
81impl From<SendError> for std::io::Error {
82    fn from(err: SendError) -> Self {
83        use std::io::ErrorKind;
84        match err {
85            SendError::ChMux => Self::new(ErrorKind::ConnectionReset, err.to_string()),
86            SendError::Closed { gracefully: false } => Self::new(ErrorKind::ConnectionReset, err.to_string()),
87            SendError::Closed { gracefully: true } => Self::new(ErrorKind::ConnectionAborted, err.to_string()),
88        }
89    }
90}
91
92/// An error occurred during sending of a message.
93#[derive(Debug)]
94pub enum TrySendError {
95    /// Channel queue is full.
96    ///
97    /// Sending should be retried.
98    Full,
99    /// Send error.
100    Send(SendError),
101}
102
103impl TrySendError {
104    /// True, if the remote endpoint closed the channel.
105    pub fn is_closed(&self) -> bool {
106        match self {
107            Self::Full => false,
108            Self::Send(err) => err.is_closed(),
109        }
110    }
111
112    /// Returns whether the error is final, i.e. no further send operation can succeed.
113    pub fn is_final(&self) -> bool {
114        match self {
115            Self::Full => false,
116            Self::Send(_) => true,
117        }
118    }
119}
120
121impl fmt::Display for TrySendError {
122    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123        match self {
124            Self::Full => write!(f, "channel queue is full"),
125            Self::Send(err) => write!(f, "{err}"),
126        }
127    }
128}
129
130impl From<SendError> for TrySendError {
131    fn from(err: SendError) -> Self {
132        Self::Send(err)
133    }
134}
135
136impl From<mpsc::error::TrySendError<PortEvt>> for TrySendError {
137    fn from(err: mpsc::error::TrySendError<PortEvt>) -> Self {
138        match err {
139            mpsc::error::TrySendError::Full(_) => Self::Full,
140            mpsc::error::TrySendError::Closed(_) => Self::Send(SendError::ChMux),
141        }
142    }
143}
144
145impl Error for TrySendError {}
146
147/// This future resolves when the remote endpoint has closed its receiver.
148///
149/// It will also resolve when the channel is closed or the channel multiplexer
150/// is shutdown.
151pub struct Closed {
152    fut: Pin<Box<dyn Future<Output = ()> + Send>>,
153}
154
155impl fmt::Debug for Closed {
156    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157        f.debug_tuple("Closed").finish()
158    }
159}
160
161impl Closed {
162    fn new(hangup_notify: &Weak<std::sync::Mutex<Option<Vec<oneshot::Sender<()>>>>>) -> Self {
163        match hangup_notify.upgrade() {
164            Some(hangup_notify) => {
165                if let Some(notifiers) = hangup_notify.lock().unwrap().as_mut() {
166                    let (tx, rx) = oneshot::channel();
167                    notifiers.push(tx);
168                    Self {
169                        fut: async move {
170                            let _ = rx.await;
171                        }
172                        .boxed(),
173                    }
174                } else {
175                    Self { fut: future::ready(()).boxed() }
176                }
177            }
178            _ => Self { fut: future::ready(()).boxed() },
179        }
180    }
181}
182
183impl Future for Closed {
184    type Output = ();
185    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
186        self.fut.as_mut().poll(cx)
187    }
188}
189
190/// Sends byte data over a channel.
191pub struct Sender {
192    local_port: u32,
193    remote_port: u32,
194    chunk_size: usize,
195    max_data_size: usize,
196    tx: mpsc::Sender<PortEvt>,
197    credits: CreditUser,
198    hangup_recved: Weak<AtomicBool>,
199    hangup_notify: Weak<std::sync::Mutex<Option<Vec<oneshot::Sender<()>>>>>,
200    port_allocator: PortAllocator,
201    storage: AnyStorage,
202    _drop_tx: oneshot::Sender<()>,
203}
204
205impl fmt::Debug for Sender {
206    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
207        f.debug_struct("Sender")
208            .field("local_port", &self.local_port)
209            .field("remote_port", &self.remote_port)
210            .field("chunk_size", &self.chunk_size)
211            .field("max_data_size", &self.max_data_size)
212            .field("is_closed", &self.is_closed())
213            .finish()
214    }
215}
216
217impl Sender {
218    /// Create a new sender.
219    #[allow(clippy::too_many_arguments)]
220    pub(crate) fn new(
221        local_port: u32, remote_port: u32, chunk_size: usize, max_data_size: usize, tx: mpsc::Sender<PortEvt>,
222        credits: CreditUser, hangup_recved: Weak<AtomicBool>,
223        hangup_notify: Weak<std::sync::Mutex<Option<Vec<oneshot::Sender<()>>>>>, port_allocator: PortAllocator,
224        storage: AnyStorage,
225    ) -> Self {
226        let (_drop_tx, drop_rx) = oneshot::channel();
227        let tx_drop = tx.clone();
228        exec::spawn(async move {
229            let _ = drop_rx.await;
230            let _ = tx_drop.send(PortEvt::SenderDropped { local_port }).await;
231        });
232
233        Self {
234            local_port,
235            remote_port,
236            chunk_size,
237            max_data_size,
238            tx,
239            credits,
240            hangup_recved,
241            hangup_notify,
242            port_allocator,
243            storage,
244            _drop_tx,
245        }
246    }
247
248    /// The local port number.
249    pub fn local_port(&self) -> u32 {
250        self.local_port
251    }
252
253    /// The remote port number.
254    pub fn remote_port(&self) -> u32 {
255        self.remote_port
256    }
257
258    /// Maximum chunk size that can be sent.
259    ///
260    /// This is set by the remote endpoint.
261    pub fn chunk_size(&self) -> usize {
262        self.chunk_size
263    }
264
265    /// Configured maximum data size of receiver.
266    ///
267    /// This is not a limit for the sender and only provided here for
268    /// advisory purposes.
269    pub fn max_data_size(&self) -> usize {
270        self.max_data_size
271    }
272
273    /// Sends data over the channel.
274    ///
275    /// Waits until send space becomes available.
276    /// Data is transmitted in chunks if it exceeds the maximum chunk size.
277    ///
278    /// # Cancel safety
279    /// If this function is cancelled before completion, the remote endpoint will receive no data.
280    pub async fn send(&mut self, mut data: Bytes) -> Result<(), SendError> {
281        if data.is_empty() {
282            let mut credits = self.credits.request(1, 1).await?;
283            credits.take(1);
284
285            let msg = PortEvt::SendData { remote_port: self.remote_port, data, first: true, last: true };
286            self.tx.send(msg).await?;
287        } else {
288            let mut first = true;
289            let mut credits = AssignedCredits::default();
290
291            while !data.is_empty() {
292                if credits.is_empty() {
293                    credits = self.credits.request(data.len().min(u32::MAX as usize) as u32, 1).await?;
294                }
295
296                let at = data.len().min(self.chunk_size).min(credits.available() as usize);
297                let chunk = data.split_to(at);
298
299                credits.take(chunk.len() as u32);
300
301                let msg = PortEvt::SendData {
302                    remote_port: self.remote_port,
303                    data: chunk,
304                    first,
305                    last: data.is_empty(),
306                };
307                self.tx.send(msg).await?;
308
309                first = false;
310            }
311        }
312
313        Ok(())
314    }
315
316    /// Streams a message by sending individual chunks.
317    pub fn send_chunks(&mut self) -> ChunkSender<'_> {
318        ChunkSender { sender: self, credits: AssignedCredits::default(), first: true }
319    }
320
321    /// Tries to send data over the channel.
322    ///
323    /// Does not wait until send space becomes available.
324    /// The maximum size of data sendable by this function is limited by
325    /// the total receive buffer size.
326    pub fn try_send(&mut self, data: &Bytes) -> Result<(), TrySendError> {
327        let mut data = data.clone();
328
329        if data.is_empty() {
330            match self.credits.try_request(1)? {
331                Some(mut credits) => {
332                    credits.take(1);
333                    let msg = PortEvt::SendData { remote_port: self.remote_port, data, first: true, last: true };
334                    self.tx.try_send(msg)?;
335                    Ok(())
336                }
337                None => Err(TrySendError::Full),
338            }
339        } else {
340            match self.credits.try_request(data.len().min(u32::MAX as usize) as u32)? {
341                Some(mut credits) => {
342                    let mut first = true;
343                    while !data.is_empty() {
344                        let at = data.len().min(self.chunk_size);
345                        let chunk = data.split_to(at);
346
347                        credits.take(chunk.len() as u32);
348
349                        let msg = PortEvt::SendData {
350                            remote_port: self.remote_port,
351                            data: chunk,
352                            first,
353                            last: data.is_empty(),
354                        };
355                        self.tx.try_send(msg)?;
356
357                        first = false;
358                    }
359                    Ok(())
360                }
361                None => Err(TrySendError::Full),
362            }
363        }
364    }
365
366    /// Sends port open requests over this port and returns the connect requests.
367    ///
368    /// The receiver limits the number of ports sendable per call, see
369    /// [Receiver::max_ports](super::Receiver::max_ports).
370    pub async fn connect(&mut self, ports: Vec<PortReq>, wait: bool) -> Result<Vec<Connect>, SendError> {
371        let mut ports_response = Vec::new();
372        let mut sent_txs = Vec::new();
373        let mut connects = Vec::new();
374
375        for port in ports {
376            let (response_tx, response_rx) = oneshot::channel();
377            ports_response.push((port, response_tx));
378
379            let response = exec::spawn(async move {
380                match response_rx.await {
381                    Ok(ConnectResponse::Accepted(sender, receiver)) => Ok((sender, receiver)),
382                    Ok(ConnectResponse::Rejected { no_ports }) => {
383                        if no_ports {
384                            Err(ConnectError::RemotePortsExhausted)
385                        } else {
386                            Err(ConnectError::Rejected)
387                        }
388                    }
389                    Err(_) => Err(ConnectError::ChMux),
390                }
391            });
392
393            let (sent_tx, sent_rx) = mpsc::channel(1);
394            sent_txs.push(sent_tx);
395
396            connects.push(Connect { sent_rx, response });
397        }
398
399        let mut first = true;
400        let mut credits = AssignedCredits::default();
401
402        while !ports_response.is_empty() {
403            if credits.is_empty() {
404                let data_len = ports_response.len() * size_of::<u32>();
405                credits =
406                    self.credits.request(data_len.min(u32::MAX as usize) as u32, size_of::<u32>() as u32).await?;
407            }
408
409            let max_ports = self.chunk_size.min(credits.available() as usize) / size_of::<u32>();
410            let next =
411                if ports_response.len() > max_ports { ports_response.split_off(max_ports) } else { Vec::new() };
412
413            credits.take((ports_response.len() * size_of::<u32>()) as u32);
414
415            let msg = PortEvt::SendPorts {
416                remote_port: self.remote_port,
417                first,
418                last: next.is_empty(),
419                wait,
420                ports: ports_response,
421            };
422            self.tx.send(msg).await?;
423
424            ports_response = next;
425            first = false;
426        }
427
428        Ok(connects)
429    }
430
431    /// True, once the remote endpoint has closed its receiver.
432    pub fn is_closed(&self) -> bool {
433        self.hangup_recved.upgrade().map(|hr| hr.load(Ordering::Relaxed)).unwrap_or_default()
434    }
435
436    /// Returns a future that will resolve when the remote endpoint closes its receiver.
437    pub fn closed(&self) -> Closed {
438        Closed::new(&self.hangup_notify)
439    }
440
441    /// Returns whether data can be sent anyway, even if remote endpoint closed the channel gracefully.
442    ///
443    /// Sending always fails if remote endpoint closed the channel non-gracefully, for example
444    /// by dropping the receiver.
445    ///
446    /// By default this is false.
447    pub fn is_graceful_close_overridden(&self) -> bool {
448        self.credits.override_graceful_close
449    }
450
451    /// Sets whether data should be sent anyway, even if remote endpoint closed the channel gracefully.
452    ///
453    /// Sending always fails if remote endpoint closed the channel non-gracefully, for example
454    /// by dropping the receiver.
455    pub fn set_override_graceful_close(&mut self, override_graceful_close: bool) {
456        self.credits.override_graceful_close = override_graceful_close;
457    }
458
459    /// Convert this into a sink.
460    pub fn into_sink(self) -> SenderSink {
461        SenderSink::new(self)
462    }
463
464    /// Returns the port allocator of the channel multiplexer.
465    pub fn port_allocator(&self) -> PortAllocator {
466        self.port_allocator.clone()
467    }
468
469    /// Returns the arbitrary data storage of the channel multiplexer.
470    pub fn storage(&self) -> AnyStorage {
471        self.storage.clone()
472    }
473}
474
475impl Drop for Sender {
476    fn drop(&mut self) {
477        // required for correct drop order
478    }
479}
480
481/// Sends chunks of a message to the remote endpoint.
482///
483/// You must call [finish](Self::finish) to finalize the sending of the message.
484/// Drop the chunk sender to cancel the message.
485pub struct ChunkSender<'a> {
486    sender: &'a mut Sender,
487    credits: AssignedCredits,
488    first: bool,
489}
490
491impl<'a> ChunkSender<'a> {
492    async fn send_int(&mut self, mut data: Bytes, finish: bool) -> Result<(), SendError> {
493        if data.is_empty() {
494            if self.credits.is_empty() {
495                self.credits = self.sender.credits.request(1, 1).await?;
496            }
497            self.credits.take(1);
498
499            let msg =
500                PortEvt::SendData { remote_port: self.sender.remote_port, data, first: self.first, last: finish };
501            self.sender.tx.send(msg).await?;
502
503            self.first = false;
504        } else {
505            while !data.is_empty() {
506                if self.credits.is_empty() {
507                    self.credits =
508                        self.sender.credits.request(data.len().min(u32::MAX as usize) as u32, 1).await?;
509                }
510
511                let at = data.len().min(self.sender.chunk_size).min(self.credits.available() as usize);
512                let chunk = data.split_to(at);
513
514                self.credits.take(chunk.len() as u32);
515
516                let msg = PortEvt::SendData {
517                    remote_port: self.sender.remote_port,
518                    data: chunk,
519                    first: self.first,
520                    last: data.is_empty() && finish,
521                };
522                self.sender.tx.send(msg).await?;
523
524                self.first = false;
525            }
526        }
527
528        Ok(())
529    }
530
531    /// Sends a non-final chunk of a message.
532    ///
533    /// The boundaries of chunks within a message may change during transmission,
534    /// thus there is no guarantee that [Receiver::recv_chunk](super::Receiver::recv_chunk)
535    /// will return the same chunks as sent.
536    pub async fn send(mut self, chunk: Bytes) -> Result<ChunkSender<'a>, SendError> {
537        self.send_int(chunk, false).await?;
538        Ok(self)
539    }
540
541    /// Send the final chunk of a message.
542    ///
543    /// This saves one multiplexer message compared to calling [send](Self::send)
544    /// followed by [finish](Self::finish).
545    pub async fn send_final(mut self, chunk: Bytes) -> Result<(), SendError> {
546        self.send_int(chunk, true).await
547    }
548
549    /// Finishes the message.
550    pub async fn finish(mut self) -> Result<(), SendError> {
551        self.send_int(Bytes::new(), true).await
552    }
553}
554
555/// A sink sending byte data over a channel.
556pub struct SenderSink {
557    sender: Option<Arc<Mutex<Sender>>>,
558    send_fut: Option<ReusableBoxFuture<'static, Result<(), SendError>>>,
559}
560
561impl SenderSink {
562    fn new(sender: Sender) -> Self {
563        Self { sender: Some(Arc::new(Mutex::new(sender))), send_fut: None }
564    }
565
566    async fn send(sender: Arc<Mutex<Sender>>, data: Bytes) -> Result<(), SendError> {
567        let mut sender = sender.lock().await;
568        sender.send(data).await
569    }
570
571    fn start_send(&mut self, data: Bytes) -> Result<(), SendError> {
572        if self.send_fut.is_some() {
573            panic!("sink is not ready for sending");
574        }
575
576        match self.sender.clone() {
577            Some(sender) => {
578                self.send_fut = Some(ReusableBoxFuture::new(Self::send(sender, data)));
579                Ok(())
580            }
581            None => panic!("start_send after sink has been closed"),
582        }
583    }
584
585    fn poll_send(&mut self, cx: &mut Context) -> Poll<Result<(), SendError>> {
586        match &mut self.send_fut {
587            Some(fut) => {
588                let res = ready!(fut.poll(cx));
589                self.send_fut = None;
590                Poll::Ready(res)
591            }
592            None => Poll::Ready(Ok(())),
593        }
594    }
595
596    fn close(&mut self) {
597        self.sender = None;
598    }
599}
600
601impl Sink<Bytes> for SenderSink {
602    type Error = SendError;
603
604    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
605        Pin::into_inner(self).poll_send(cx)
606    }
607
608    fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
609        Pin::into_inner(self).start_send(item)
610    }
611
612    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
613        Pin::into_inner(self).poll_send(cx)
614    }
615
616    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
617        ready!(Pin::into_inner(self.as_mut()).poll_send(cx))?;
618        Pin::into_inner(self).close();
619        Poll::Ready(Ok(()))
620    }
621}