1use futures::{FutureExt, Sink};
2use serde::{Deserialize, Serialize};
3use std::{
4 convert::TryFrom,
5 error::Error,
6 fmt,
7 marker::PhantomData,
8 pin::Pin,
9 sync::{Arc, Weak},
10 task::{Context, Poll, ready},
11};
12use tokio_util::sync::ReusableBoxFuture;
13
14use super::{
15 super::{
16 ClosedReason, DEFAULT_BUFFER, DEFAULT_MAX_ITEM_SIZE, RemoteSendError, SendErrorExt, Sending,
17 base::{self, PortDeserializer, PortSerializer},
18 },
19 SendReq,
20 receiver::RecvError,
21 send_req,
22};
23use crate::{RemoteSend, chmux, codec, exec, rch::SendingError};
24
25#[derive(Clone, Debug, Serialize, Deserialize)]
27pub enum SendError<T> {
28 Closed(T),
30 RemoteSend(base::SendErrorKind),
32 RemoteConnect(chmux::ConnectError),
34 RemoteListen(chmux::ListenerError),
36 RemoteForward,
38}
39
40impl<T> SendError<T> {
41 pub fn is_closed(&self) -> bool {
43 matches!(self, Self::Closed(_))
44 }
45
46 pub fn closed_reason(&self) -> Option<ClosedReason> {
51 match self {
52 Self::RemoteSend(base::SendErrorKind::Serialize(_)) => None,
53 Self::RemoteSend(base::SendErrorKind::Send(chmux::SendError::Closed { .. })) => {
54 Some(ClosedReason::Dropped)
55 }
56 Self::Closed(_) => Some(ClosedReason::Closed),
57 _ => Some(ClosedReason::Failed),
58 }
59 }
60
61 pub fn is_disconnected(&self) -> bool {
63 !matches!(self, Self::RemoteSend(base::SendErrorKind::Serialize(_)))
64 }
65
66 #[deprecated = "a remoc::rch::mpsc::SendError is always final"]
68 pub fn is_final(&self) -> bool {
69 true
70 }
71
72 pub fn is_item_specific(&self) -> bool {
74 matches!(self, Self::RemoteSend(err) if err.is_item_specific())
75 }
76
77 pub fn without_item(self) -> SendError<()> {
79 match self {
80 Self::Closed(_) => SendError::Closed(()),
81 Self::RemoteSend(err) => SendError::RemoteSend(err),
82 Self::RemoteConnect(err) => SendError::RemoteConnect(err),
83 Self::RemoteListen(err) => SendError::RemoteListen(err),
84 Self::RemoteForward => SendError::RemoteForward,
85 }
86 }
87}
88
89impl<T> SendErrorExt for SendError<T> {
90 fn is_closed(&self) -> bool {
91 self.is_closed()
92 }
93
94 fn is_disconnected(&self) -> bool {
95 self.is_disconnected()
96 }
97
98 fn is_final(&self) -> bool {
99 #[allow(deprecated)]
100 self.is_final()
101 }
102
103 fn is_item_specific(&self) -> bool {
104 self.is_item_specific()
105 }
106}
107
108impl<T> fmt::Display for SendError<T> {
109 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
110 match self {
111 Self::Closed(_) => write!(f, "channel is closed"),
112 Self::RemoteSend(err) => write!(f, "send error: {err}"),
113 Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
114 Self::RemoteListen(err) => write!(f, "listen error: {err}"),
115 Self::RemoteForward => write!(f, "forwarding error"),
116 }
117 }
118}
119
120impl<T> Error for SendError<T> where T: fmt::Debug {}
121
122impl<T> SendError<T> {
123 fn from_remote_send_error(err: RemoteSendError, value: T) -> Self {
124 match err {
125 RemoteSendError::Send(err) => Self::RemoteSend(err),
126 RemoteSendError::Connect(err) => Self::RemoteConnect(err),
127 RemoteSendError::Listen(err) => Self::RemoteListen(err),
128 RemoteSendError::Forward => Self::RemoteForward,
129 RemoteSendError::Closed => Self::Closed(value),
130 }
131 }
132}
133
134#[derive(Clone, Debug, Serialize, Deserialize)]
136pub enum TrySendError<T> {
137 Closed(T),
139 Full(T),
142 RemoteSend(base::SendErrorKind),
144 RemoteConnect(chmux::ConnectError),
146 RemoteListen(chmux::ListenerError),
148 RemoteForward,
150}
151
152impl<T> TrySendError<T> {
153 pub fn is_closed(&self) -> bool {
155 matches!(self, Self::Closed(_))
156 }
157
158 pub fn is_disconnected(&self) -> bool {
160 !matches!(self, Self::RemoteSend(base::SendErrorKind::Serialize(_)) | Self::Full(_))
161 }
162
163 pub fn is_final(&self) -> bool {
165 !matches!(self, Self::Full(_))
166 }
167
168 pub fn is_item_specific(&self) -> bool {
170 matches!(self, Self::RemoteSend(err) if err.is_item_specific())
171 }
172}
173
174impl<T> SendErrorExt for TrySendError<T> {
175 fn is_closed(&self) -> bool {
176 self.is_closed()
177 }
178
179 fn is_disconnected(&self) -> bool {
180 self.is_disconnected()
181 }
182
183 fn is_final(&self) -> bool {
184 self.is_final()
185 }
186
187 fn is_item_specific(&self) -> bool {
188 self.is_item_specific()
189 }
190}
191
192impl<T> fmt::Display for TrySendError<T> {
193 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
194 match self {
195 Self::Closed(_) => write!(f, "channel is closed"),
196 Self::Full(_) => write!(f, "channel is full"),
197 Self::RemoteSend(err) => write!(f, "send error: {err}"),
198 Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
199 Self::RemoteListen(err) => write!(f, "listen error: {err}"),
200 Self::RemoteForward => write!(f, "forwarding error"),
201 }
202 }
203}
204
205impl<T> TrySendError<T> {
206 fn from_remote_send_error(err: RemoteSendError, value: T) -> Self {
207 match err {
208 RemoteSendError::Send(err) => Self::RemoteSend(err),
209 RemoteSendError::Connect(err) => Self::RemoteConnect(err),
210 RemoteSendError::Listen(err) => Self::RemoteListen(err),
211 RemoteSendError::Forward => Self::RemoteForward,
212 RemoteSendError::Closed => Self::Closed(value),
213 }
214 }
215}
216
217impl<T> From<SendError<T>> for TrySendError<T> {
218 fn from(err: SendError<T>) -> Self {
219 match err {
220 SendError::Closed(v) => Self::Closed(v),
221 SendError::RemoteSend(err) => Self::RemoteSend(err),
222 SendError::RemoteConnect(err) => Self::RemoteConnect(err),
223 SendError::RemoteListen(err) => Self::RemoteListen(err),
224 SendError::RemoteForward => Self::RemoteForward,
225 }
226 }
227}
228
229impl<T> TryFrom<TrySendError<T>> for SendError<T> {
230 type Error = TrySendError<T>;
231
232 fn try_from(err: TrySendError<T>) -> Result<Self, Self::Error> {
233 match err {
234 TrySendError::Closed(v) => Ok(Self::Closed(v)),
235 TrySendError::RemoteSend(err) => Ok(Self::RemoteSend(err)),
236 TrySendError::RemoteConnect(err) => Ok(Self::RemoteConnect(err)),
237 TrySendError::RemoteForward => Ok(Self::RemoteForward),
238 other => Err(other),
239 }
240 }
241}
242
243impl<T> Error for TrySendError<T> where T: fmt::Debug {}
244
245pub struct Sender<T, Codec = codec::Default, const BUFFER: usize = DEFAULT_BUFFER> {
251 tx: Weak<tokio::sync::mpsc::Sender<SendReq<T>>>,
252 closed_rx: tokio::sync::watch::Receiver<Option<ClosedReason>>,
253 remote_send_err_rx: tokio::sync::watch::Receiver<Option<RemoteSendError>>,
254 dropped_tx: tokio::sync::mpsc::Sender<()>,
255 max_item_size: usize,
256 _codec: PhantomData<Codec>,
257}
258
259impl<T, Codec, const BUFFER: usize> fmt::Debug for Sender<T, Codec, BUFFER> {
260 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
261 f.debug_struct("Sender").finish()
262 }
263}
264
265impl<T, Codec, const BUFFER: usize> Clone for Sender<T, Codec, BUFFER> {
266 fn clone(&self) -> Self {
267 Self {
268 tx: self.tx.clone(),
269 closed_rx: self.closed_rx.clone(),
270 remote_send_err_rx: self.remote_send_err_rx.clone(),
271 dropped_tx: self.dropped_tx.clone(),
272 max_item_size: self.max_item_size,
273 _codec: PhantomData,
274 }
275 }
276}
277
278#[derive(Serialize, Deserialize)]
280pub(crate) struct TransportedSender<T, Codec> {
281 port: Option<u32>,
283 data: PhantomData<T>,
285 codec: PhantomData<Codec>,
287 #[serde(default = "default_max_item_size")]
289 max_item_size: u64,
290}
291
292const fn default_max_item_size() -> u64 {
293 u64::MAX
294}
295
296impl<T, Codec, const BUFFER: usize> Sender<T, Codec, BUFFER>
297where
298 T: Send + 'static,
299{
300 pub(crate) fn new(
302 tx: tokio::sync::mpsc::Sender<SendReq<T>>,
303 mut closed_rx: tokio::sync::watch::Receiver<Option<ClosedReason>>,
304 remote_send_err_rx: tokio::sync::watch::Receiver<Option<RemoteSendError>>,
305 ) -> Self {
306 let tx = Arc::new(tx);
307 let (dropped_tx, mut dropped_rx) = tokio::sync::mpsc::channel(1);
308
309 let this = Self {
310 tx: Arc::downgrade(&tx),
311 closed_rx: closed_rx.clone(),
312 remote_send_err_rx,
313 dropped_tx,
314 max_item_size: DEFAULT_MAX_ITEM_SIZE,
315 _codec: PhantomData,
316 };
317
318 exec::spawn(async move {
320 loop {
321 tokio::select! {
322 res = closed_rx.changed() => {
323 match res {
324 Ok(()) if closed_rx.borrow().is_some() => break,
325 Ok(()) => (),
326 Err(_) => break,
327 }
328 },
329 _ = dropped_rx.recv() => break,
330 }
331 }
332
333 drop(tx);
334 });
335
336 this
337 }
338
339 pub(crate) fn new_closed() -> Self {
341 Self {
342 tx: Weak::new(),
343 closed_rx: tokio::sync::watch::channel(Some(ClosedReason::Closed)).1,
344 remote_send_err_rx: tokio::sync::watch::channel(None).1,
345 dropped_tx: tokio::sync::mpsc::channel(1).0,
346 max_item_size: DEFAULT_MAX_ITEM_SIZE,
347 _codec: PhantomData,
348 }
349 }
350
351 pub async fn send(&self, value: T) -> Result<Sending<T>, SendError<T>> {
358 if let Some(err) = self.remote_send_err_rx.borrow().as_ref() {
359 return Err(SendError::from_remote_send_error(err.clone(), value));
360 }
361
362 match self.tx.upgrade() {
363 Some(tx) => {
364 let (req, sent) = send_req(Ok(value));
365 match tx.send(req).await {
366 Ok(()) => Ok(sent),
367 Err(err) => Err(SendError::Closed(err.0.value.expect("unreachable"))),
368 }
369 }
370 None => Err(SendError::Closed(value)),
371 }
372 }
373
374 pub fn try_send(&self, value: T) -> Result<Sending<T>, TrySendError<T>> {
381 if let Some(err) = self.remote_send_err_rx.borrow().as_ref() {
382 return Err(TrySendError::from_remote_send_error(err.clone(), value));
383 }
384
385 match self.tx.upgrade() {
386 Some(tx) => {
387 let (req, sent) = send_req(Ok(value));
388 match tx.try_send(req) {
389 Ok(()) => Ok(sent),
390 Err(tokio::sync::mpsc::error::TrySendError::Full(err)) => {
391 Err(TrySendError::Full(err.value.expect("unreachable")))
392 }
393 Err(tokio::sync::mpsc::error::TrySendError::Closed(err)) => {
394 Err(TrySendError::Closed(err.value.expect("unreachable")))
395 }
396 }
397 }
398 None => Err(TrySendError::Closed(value)),
399 }
400 }
401
402 pub fn blocking_send(&self, value: T) -> Result<Sending<T>, SendError<T>> {
412 exec::task::block_on(self.send(value))
413 }
414
415 pub async fn reserve(&self) -> Result<Permit<T>, SendError<()>> {
423 if let Some(err) = self.remote_send_err_rx.borrow().as_ref() {
424 return Err(SendError::from_remote_send_error(err.clone(), ()));
425 }
426
427 match self.tx.upgrade() {
428 Some(tx) => {
429 let tx = (*tx).clone();
430 match tx.reserve_owned().await {
431 Ok(permit) => Ok(Permit(permit)),
432 Err(_) => Err(SendError::Closed(())),
433 }
434 }
435 _ => Err(SendError::Closed(())),
436 }
437 }
438
439 pub fn capacity(&self) -> usize {
443 match self.tx.upgrade() {
444 Some(tx) => tx.capacity(),
445 None => 0,
446 }
447 }
448
449 pub async fn closed(&self) {
453 let mut closed = self.closed_rx.clone();
454 while closed.borrow().is_none() {
455 if closed.changed().await.is_err() {
456 break;
457 }
458 }
459 }
460
461 pub fn closed_reason(&self) -> Option<ClosedReason> {
465 match (self.closed_rx.borrow().clone(), self.remote_send_err_rx.borrow().as_ref()) {
466 (Some(reason), _) => Some(reason),
467 (None, Some(_)) => Some(ClosedReason::Failed),
468 (None, None) => None,
469 }
470 }
471
472 pub fn is_closed(&self) -> bool {
476 self.closed_reason().is_some()
477 }
478
479 pub fn set_codec<NewCodec>(self) -> Sender<T, NewCodec, BUFFER> {
481 Sender {
482 tx: self.tx.clone(),
483 closed_rx: self.closed_rx.clone(),
484 remote_send_err_rx: self.remote_send_err_rx.clone(),
485 dropped_tx: self.dropped_tx.clone(),
486 max_item_size: self.max_item_size,
487 _codec: PhantomData,
488 }
489 }
490
491 pub fn set_buffer<const NEW_BUFFER: usize>(self) -> Sender<T, Codec, NEW_BUFFER> {
493 assert!(NEW_BUFFER > 0, "buffer size must not be zero");
494 Sender {
495 tx: self.tx.clone(),
496 closed_rx: self.closed_rx.clone(),
497 remote_send_err_rx: self.remote_send_err_rx.clone(),
498 dropped_tx: self.dropped_tx.clone(),
499 max_item_size: self.max_item_size,
500 _codec: PhantomData,
501 }
502 }
503
504 pub fn max_item_size(&self) -> usize {
506 self.max_item_size
507 }
508
509 pub fn set_max_item_size(&mut self, max_item_size: usize) {
511 self.max_item_size = max_item_size;
512 }
513}
514
515pub struct Permit<T>(tokio::sync::mpsc::OwnedPermit<SendReq<T>>);
517
518impl<T> Permit<T>
519where
520 T: Send,
521{
522 pub fn send(self, value: T) -> Sending<T> {
524 let (req, sent) = send_req(Ok(value));
525 self.0.send(req);
526 sent
527 }
528}
529
530impl<T, Codec, const BUFFER: usize> Drop for Sender<T, Codec, BUFFER> {
531 fn drop(&mut self) {
532 }
534}
535
536impl<T, Codec, const BUFFER: usize> Serialize for Sender<T, Codec, BUFFER>
537where
538 T: RemoteSend,
539 Codec: codec::Codec,
540{
541 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
543 where
544 S: serde::Serializer,
545 {
546 let port = match self.tx.upgrade() {
547 Some(tx) => {
549 let closed_rx = self.closed_rx.clone();
551 let remote_send_err_rx = self.remote_send_err_rx.clone();
552 let max_item_size = self.max_item_size;
553
554 Some(PortSerializer::connect(move |connect| {
555 async move {
556 let (raw_tx, raw_rx) = match connect.await {
558 Ok(tx_rx) => tx_rx,
559 Err(err) => {
560 let _ = tx.send(SendReq::new(Err(RecvError::RemoteConnect(err)))).await;
561 return;
562 }
563 };
564
565 super::recv_impl::<T, Codec>(
566 &tx,
567 raw_tx,
568 raw_rx,
569 remote_send_err_rx,
570 closed_rx,
571 max_item_size,
572 )
573 .await;
574 }
575 .boxed()
576 })?)
577 }
578 None => {
579 None
581 }
582 };
583
584 let transported = TransportedSender::<T, Codec> {
586 port,
587 data: PhantomData,
588 codec: PhantomData,
589 max_item_size: self.max_item_size.try_into().unwrap_or(u64::MAX),
590 };
591 transported.serialize(serializer)
592 }
593}
594
595impl<'de, T, Codec, const BUFFER: usize> Deserialize<'de> for Sender<T, Codec, BUFFER>
596where
597 T: RemoteSend,
598 Codec: codec::Codec,
599{
600 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
602 where
603 D: serde::Deserializer<'de>,
604 {
605 assert!(BUFFER > 0, "BUFFER must not be zero");
606
607 let TransportedSender { port, max_item_size, .. } =
609 TransportedSender::<T, Codec>::deserialize(deserializer)?;
610 let max_item_size = usize::try_from(max_item_size).unwrap_or(usize::MAX);
611
612 match port {
613 Some(port) => {
615 let (tx, rx) = tokio::sync::mpsc::channel(BUFFER);
617 let (closed_tx, closed_rx) = tokio::sync::watch::channel(None);
618 let (remote_send_err_tx, remote_send_err_rx) = tokio::sync::watch::channel(None);
619
620 PortDeserializer::accept(port, move |local_port, request| {
622 async move {
623 let (raw_tx, raw_rx) = match request.accept_from(local_port).await {
625 Ok(tx_rx) => tx_rx,
626 Err(err) => {
627 let _ = remote_send_err_tx.send(Some(RemoteSendError::Listen(err)));
628 return;
629 }
630 };
631
632 super::send_impl::<T, Codec>(
633 rx,
634 raw_tx,
635 raw_rx,
636 remote_send_err_tx,
637 closed_tx,
638 max_item_size,
639 )
640 .await;
641 }
642 .boxed()
643 })?;
644
645 Ok(Self::new(tx, closed_rx, remote_send_err_rx))
646 }
647
648 None => Ok(Self::new_closed()),
650 }
651 }
652}
653
654type ReserveRet<T, Codec, const BUFFER: usize> = (Result<Permit<T>, SendError<()>>, Sender<T, Codec, BUFFER>);
655
656pub struct SenderSink<T, Codec = codec::Default, const BUFFER: usize = DEFAULT_BUFFER> {
658 tx: Option<Sender<T, Codec, BUFFER>>,
659 permit: Option<Permit<T>>,
660 reserve: Option<ReusableBoxFuture<'static, ReserveRet<T, Codec, BUFFER>>>,
661 sending: Option<Sending<T>>,
662}
663
664impl<T, Codec, const BUFFER: usize> fmt::Debug for SenderSink<T, Codec, BUFFER> {
665 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
666 f.debug_struct("SenderSink").field("ready", &self.permit.is_some()).finish()
667 }
668}
669
670impl<T, Codec, const BUFFER: usize> SenderSink<T, Codec, BUFFER>
671where
672 T: Send + 'static,
673 Codec: codec::Codec,
674{
675 pub fn new(tx: Sender<T, Codec, BUFFER>) -> Self {
677 Self {
678 tx: Some(tx.clone()),
679 permit: None,
680 reserve: Some(ReusableBoxFuture::new(Self::make_reserve(tx))),
681 sending: None,
682 }
683 }
684
685 fn new_closed() -> Self {
686 Self { tx: None, permit: None, reserve: None, sending: None }
687 }
688
689 pub fn get_ref(&self) -> Option<&Sender<T, Codec, BUFFER>> {
693 self.tx.as_ref()
694 }
695
696 async fn make_reserve(tx: Sender<T, Codec, BUFFER>) -> ReserveRet<T, Codec, BUFFER> {
697 let result = tx.reserve().await;
698 (result, tx)
699 }
700}
701
702impl<T, Codec, const BUFFER: usize> Clone for SenderSink<T, Codec, BUFFER>
703where
704 T: Send + 'static,
705 Codec: codec::Codec,
706{
707 fn clone(&self) -> Self {
708 match self.tx.clone() {
709 Some(tx) => Self::new(tx),
710 None => Self::new_closed(),
711 }
712 }
713}
714
715impl<T, Codec, const BUFFER: usize> Sink<T> for SenderSink<T, Codec, BUFFER>
716where
717 T: Send + 'static,
718 Codec: codec::Codec,
719{
720 type Error = SendError<()>;
721
722 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
723 if self.permit.is_some() {
724 return Poll::Ready(Ok(()));
725 }
726
727 let Some(reserve) = self.reserve.as_mut() else { return Poll::Ready(Err(SendError::Closed(()))) };
728 let (permit, tx) = ready!(reserve.poll(cx));
729 reserve.set(Self::make_reserve(tx));
730
731 self.permit = Some(permit?);
732
733 Poll::Ready(Ok(()))
734 }
735
736 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
737 let permit = self.permit.take().expect("SenderSink is not ready for sending");
738 self.sending = Some(permit.send(item));
739 Ok(())
740 }
741
742 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
743 let Some(sending) = self.sending.as_mut() else { return Poll::Ready(Ok(())) };
744
745 let res = ready!(sending.poll_unpin(cx));
746 self.sending = None;
747
748 Poll::Ready(res.map_err(|err| match err {
749 SendingError::Send(base) => SendError::RemoteSend(base.kind),
750 SendingError::Dropped => SendError::Closed(()),
751 }))
752 }
753
754 fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
755 self.tx = None;
756 self.permit = None;
757 self.reserve = None;
758 Poll::Ready(Ok(()))
759 }
760}
761
762impl<T, Codec, const BUFFER: usize> From<Sender<T, Codec, BUFFER>> for SenderSink<T, Codec, BUFFER>
763where
764 T: Send + 'static,
765 Codec: codec::Codec,
766{
767 fn from(tx: Sender<T, Codec, BUFFER>) -> Self {
768 Self::new(tx)
769 }
770}