1use std::ops::DerefMut;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use anyhow::anyhow;
8use collections::PriorityQueue;
9use futures::{Stream, StreamExt};
10use mpsc::with_priority_channel;
11use parking_lot::RwLock;
12use scopeguard::defer;
13use tokio::sync::oneshot;
14use tonic::codegen::InterceptedService;
15use tonic::metadata::Ascii;
16use tonic::service::Interceptor;
17use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint};
18use tonic::{metadata::MetadataValue, Request, Status};
19
20use super::transferpb::data_transfer_client::DataTransferClient;
21pub use super::transferpb::{self, Message};
22use super::{
23 split_into_chunks, ChunkedBuffer, Error, Id, Priority, Result, CHUNK_SIZE_LIMIT,
24 RECV_CHUNKS_TIMEOUT,
25};
26
27type Sender<T> = mpsc::Sender<T, mpsc::SendError<T>>;
28
29type PriorityQueueType = Arc<parking_lot::RwLock<PriorityQueue<Priority, Message>>>;
30
31type DataTransferClientType = DataTransferClient<InterceptedService<Channel, AuthInterceptor>>;
32
33type DashMap<K, V> = dashmap::DashMap<K, V, ahash::RandomState>;
34
35pub struct ClientBuilder {
36 addr: String,
37 concurrency_limit: usize,
38 connect_timeout: Option<Duration>,
39 timeout: Option<Duration>,
40 tls: bool,
41 tls_ca: Option<String>,
42 tls_domain: Option<String>,
43 auth_token: Option<String>,
44 chunk_size: usize,
45 recv_chunks_timeout: Duration,
46}
47
48impl Default for ClientBuilder {
49 fn default() -> Self {
50 Self {
51 addr: Default::default(),
52 concurrency_limit: 10,
53 connect_timeout: None,
54 timeout: None,
55 tls: false,
56 tls_ca: None,
57 tls_domain: None,
58 auth_token: None,
59 chunk_size: CHUNK_SIZE_LIMIT,
60 recv_chunks_timeout: RECV_CHUNKS_TIMEOUT,
61 }
62 }
63}
64
65impl ClientBuilder {
66 pub async fn connect(self) -> Result<Client> {
67 let inner = connect(
68 self.addr.as_str(),
69 self.concurrency_limit,
70 self.connect_timeout,
71 self.timeout,
72 self.tls,
73 self.tls_ca.as_ref(),
74 self.tls_domain.as_ref(),
75 self.auth_token.clone(),
76 )
77 .await?;
78 Ok(Client {
79 inner,
80 builder: Arc::new(self),
81 })
82 }
83
84 pub fn connect_lazy(self) -> Result<Client> {
85 let inner = connect_lazy(
86 self.addr.as_str(),
87 self.concurrency_limit,
88 self.connect_timeout,
89 self.timeout,
90 self.tls,
91 self.tls_ca.as_ref(),
92 self.tls_domain.as_ref(),
93 self.auth_token.clone(),
94 )?;
95 Ok(Client {
96 inner,
97 builder: Arc::new(self),
98 })
99 }
100
101 pub fn concurrency_limit(mut self, concurrency_limit: usize) -> Self {
102 self.concurrency_limit = concurrency_limit;
103 self
104 }
105
106 pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self {
107 self.connect_timeout = Some(connect_timeout);
108 self
109 }
110
111 pub fn timeout(mut self, timeout: Duration) -> Self {
112 self.timeout = Some(timeout);
113 self
114 }
115
116 pub fn tls(mut self, tls_ca: Option<String>, tls_domain: Option<String>) -> Self {
117 self.tls = true;
118 self.tls_ca = tls_ca;
119 self.tls_domain = tls_domain;
120 self
121 }
122
123 pub fn auth_token(mut self, token: Option<String>) -> Self {
124 self.auth_token = token;
125 self
126 }
127
128 pub fn chunk_size(mut self, chunk_size: usize) -> Self {
129 self.chunk_size = chunk_size;
130 self
131 }
132
133 pub fn recv_chunks_timeout(mut self, recv_chunks_timeout: Duration) -> Self {
134 self.recv_chunks_timeout = recv_chunks_timeout;
135 self
136 }
137}
138
139#[derive(Clone)]
140pub struct Client {
141 inner: DataTransferClientType,
142 builder: Arc<ClientBuilder>,
143}
144
145impl Client {
146 #[inline]
147 #[allow(clippy::new_ret_no_self)]
148 pub fn new(addr: String) -> ClientBuilder {
149 ClientBuilder {
150 addr,
151 ..Default::default()
152 }
153 }
154
155 #[inline]
156 fn connect(&mut self) -> &mut DataTransferClientType {
157 &mut self.inner
158 }
159
160 #[inline]
162 pub async fn send(&mut self, data: Vec<u8>) -> Result<Vec<u8>> {
163 self.send_priority(data, Priority::MIN).await
164 }
165
166 #[inline]
167 pub async fn send_priority(&mut self, data: Vec<u8>, p: Priority) -> Result<Vec<u8>> {
168 if let Some(t) = self.builder.timeout {
169 tokio::time::timeout(t, self._send_priority(data, p)).await?
170 } else {
171 self._send_priority(data, p).await
172 }
173 }
174
175 #[inline]
176 async fn _send_priority(&mut self, data: Vec<u8>, p: Priority) -> Result<Vec<u8>> {
177 let chunk_size = self.builder.chunk_size;
178 let timeout = self.builder.timeout;
179 let c = self.connect();
180 if data.len() > chunk_size {
181 let mut resp_data = None;
183 for msg in split_into_chunks(next_id(), data.as_slice(), p, chunk_size) {
184 let req = if let Some(t) = timeout {
185 let mut req = tonic::Request::new(msg);
186 req.set_timeout(t);
187 req
188 } else {
189 tonic::Request::new(msg)
190 };
191 let resp = c.send(req).await.map_err(Error::new)?;
192 let data = resp.into_inner().data;
193 if resp_data.is_none() && data.is_some() {
194 resp_data = data;
195 }
196 }
197 if let Some(resp_data) = resp_data {
198 Ok(resp_data)
199 } else {
200 Err(anyhow!("Timeout"))
201 }
202 } else {
203 let msg = Message {
204 id: next_id(),
205 priority: p,
206 total_chunks: 0,
207 chunk_index: 0,
208 data: Some(data),
209 ..Default::default()
210 };
211 let req = if let Some(t) = timeout {
212 let mut req = tonic::Request::new(msg);
213 req.set_timeout(t);
214 req
215 } else {
216 tonic::Request::new(msg)
217 };
218 let resp = c.send(req).await.map_err(Error::new);
219 let msg = resp?.into_inner();
220 Ok(msg.data.unwrap_or_default())
221 }
222 }
223
224 #[inline]
225 pub async fn transfer_start(&mut self, req_queue_cap: usize) -> Mailbox {
226 let mut this = self.clone();
227 let req_queue = Arc::new(parking_lot::RwLock::new(PriorityQueue::default()));
228 let (tx, rx) = with_priority_channel(req_queue.clone(), req_queue_cap);
229 let rx = Receiver::new(rx);
230 let mailbox = Mailbox::new(tx, req_queue, req_queue_cap, self.builder.chunk_size);
231 let addr = self.builder.addr.clone();
232 tokio::spawn(async move {
233 loop {
234 log::trace!("gRPC call transfer ... ");
235 if let Err(e) = this.connect().transfer(Request::new(rx.clone())).await {
236 log::warn!(
237 "gRPC call transfer failure, addr:{}, {}",
238 addr,
239 e.to_string()
240 );
241 tokio::time::sleep(Duration::from_secs(3)).await;
242 continue;
243 }
244
245 log::info!(
246 "transfer is exit, addr: {:?}, is_closed: {}",
247 this.builder.addr,
248 rx.is_closed()
249 );
250 break;
251 }
252 });
253 mailbox
254 }
255
256 #[inline]
257 pub async fn duplex_transfer_start(&mut self, queue_cap: usize) -> DuplexMailbox {
258 let mut this = self.clone();
259 let req_queue = Arc::new(parking_lot::RwLock::new(PriorityQueue::default()));
260 let (req_tx, req_rx) = with_priority_channel(req_queue.clone(), queue_cap);
261 let req_rx = Receiver::new(req_rx);
262
263 let resp_queue = Arc::new(parking_lot::RwLock::new(PriorityQueue::default()));
264 let (mut resp_tx, resp_rx) = with_priority_channel(resp_queue.clone(), queue_cap);
265 let resp_rx = Receiver::new(resp_rx);
266
267 let mailbox = DuplexMailbox::new(
268 req_tx,
269 resp_rx,
270 req_queue,
271 resp_queue,
272 queue_cap,
273 self.builder.chunk_size,
274 self.builder.recv_chunks_timeout,
275 self.builder.timeout,
276 );
277 let addr = self.builder.addr.clone();
278 tokio::spawn(async move {
279 'outer: loop {
280 log::trace!("gRPC call duplex transfer ... ");
281 match this
282 .connect()
283 .duplex_transfer(Request::new(req_rx.clone()))
284 .await
285 {
286 Err(e) => {
287 log::warn!(
288 "gRPC call duplex transfer failure, addr:{}, {}",
289 addr,
290 e.to_string()
291 );
292 tokio::time::sleep(Duration::from_secs(3)).await;
293 continue;
294 }
295 Ok(resp) => {
296 let mut resp_stream = resp.into_inner();
297 while let Some(received) = resp_stream.next().await {
298 match received {
299 Err(e) => {
300 log::warn!(
301 "gRPC duplex transfer response stream recv failure, addr:{}, {}",
302 addr,
303 e.to_string()
304 );
305 tokio::time::sleep(Duration::from_secs(3)).await;
306 continue 'outer;
307 }
308 Ok(received) => {
309 if let Err(e) =
310 resp_tx.send((received.priority, received)).await
311 {
312 log::warn!(
313 "gRPC duplex transfer send response message failure, addr:{}, {}",
314 addr,
315 e.to_string()
316 );
317 }
318 }
319 }
320 }
321 log::warn!(
322 "gRPC duplex transfer response stream recv None, addr:{}",
323 addr,
324 );
325 tokio::time::sleep(Duration::from_secs(3)).await;
326 continue;
327 }
328 }
329 }
330 });
331 mailbox
332 }
333}
334
335#[derive(Clone)]
336pub struct Mailbox {
337 req_tx: Sender<(Priority, Message)>,
338 req_queue: PriorityQueueType,
339 req_queue_cap: usize,
340 chunk_size: usize,
341}
342
343impl Mailbox {
344 #[inline]
345 fn new(
346 req_tx: Sender<(Priority, Message)>,
347 req_queue: PriorityQueueType,
348 req_queue_cap: usize,
349 chunk_size: usize,
350 ) -> Self {
351 Self {
352 req_tx,
353 req_queue,
354 req_queue_cap,
355 chunk_size,
356 }
357 }
358
359 #[inline]
360 pub fn req_queue_is_full(&self) -> bool {
361 self.req_queue_len() >= self.req_queue_cap
362 }
363
364 #[inline]
365 pub fn req_queue_len(&self) -> usize {
366 self.req_queue.read().len()
367 }
368
369 #[inline]
370 pub async fn send(&mut self, data: Vec<u8>) -> Result<(), SendError<Vec<u8>>> {
371 self.send_priority(data, Priority::MIN).await
372 }
373
374 #[inline]
375 pub async fn send_priority(
376 &mut self,
377 data: Vec<u8>,
378 p: Priority,
379 ) -> Result<(), SendError<Vec<u8>>> {
380 if data.len() > self.chunk_size {
381 for msg in split_into_chunks(next_id(), data.as_slice(), p, self.chunk_size) {
383 self.req_tx.send((p, msg)).await.map_err(Self::error)?;
384 }
385 Ok(())
386 } else {
387 let msg = Message {
388 id: next_id(),
389 priority: p,
390 total_chunks: 0,
391 chunk_index: 0,
392 data: Some(data),
393 ..Default::default()
394 };
395 self.req_tx.send((p, msg)).await.map_err(Self::error)
396 }
397 }
398
399 #[inline]
400 pub async fn quick_send(&mut self, data: Vec<u8>) -> Result<(), SendError<Vec<u8>>> {
401 self.send_priority(data, Priority::MAX).await
402 }
403
404 #[inline]
405 fn error(e: mpsc::SendError<(Priority, Message)>) -> SendError<Vec<u8>> {
406 if e.is_full() {
407 e.into_inner()
408 .map(|(_, msg)| SendError::<Vec<u8>>::full(msg.data.unwrap_or_default()))
409 .unwrap_or_else(|| SendError::<Vec<u8>>::disconnected(None))
410 } else if e.is_disconnected() {
411 SendError::<Vec<u8>>::disconnected(
412 e.into_inner().map(|(_, msg)| msg.data.unwrap_or_default()),
413 )
414 } else {
415 SendError::<Vec<u8>>::disconnected(None)
416 }
417 }
418}
419
420#[derive(Clone)]
421pub struct DuplexMailbox {
422 req_tx: Sender<(Priority, Message)>,
423 req_queue: PriorityQueueType,
424 resp_queue: PriorityQueueType,
425 resp_senders: Arc<DashMap<Id, oneshot::Sender<Result<Vec<u8>>>>>,
426 queue_cap: usize,
427 chunk_size: usize,
428 timeout: Option<Duration>,
429}
430
431#[allow(clippy::too_many_arguments)]
432impl DuplexMailbox {
433 #[inline]
434 fn new(
435 req_tx: Sender<(Priority, Message)>,
436 resp_rx: Receiver,
437 req_queue: PriorityQueueType,
438 resp_queue: PriorityQueueType,
439 queue_cap: usize,
440 chunk_size: usize,
441 recv_chunks_timeout: Duration,
442 timeout: Option<Duration>,
443 ) -> Self {
444 let resp_chunked_buffer = ChunkedBuffer::new(recv_chunks_timeout);
445 Self {
446 req_tx,
447 req_queue,
448 resp_queue,
449 resp_senders: Arc::new(DashMap::default()),
450 queue_cap,
451 chunk_size,
452 timeout,
453 }
454 .start(resp_rx, resp_chunked_buffer)
455 }
456
457 fn start(self, mut resp_rx: Receiver, resp_chunked_buffer: ChunkedBuffer) -> Self {
458 let resp_senders = self.resp_senders.clone();
459 tokio::spawn(async move {
460 let mut removed_ids = Vec::new();
461 while let Some(mut msg) = resp_rx.next().await {
462 if let Some(err) = msg.err.take() {
463 if let Some((_, resp_sender)) = resp_senders.remove(&msg.id) {
464 if !resp_sender.is_closed() {
465 if let Err(e) = resp_sender.send(Err(anyhow!(err))) {
466 log::warn!("response sender send fail, {:?}", e);
467 }
468 } else {
469 log::warn!("response sender is closed");
470 }
471 }
472 continue;
473 }
474
475 let msg = resp_chunked_buffer
476 .merge(msg, None, Some(&mut removed_ids))
477 .await;
478
479 if !removed_ids.is_empty() {
480 for removed_id in removed_ids.drain(..) {
481 log::debug!("removed_id: {}", removed_id);
482 resp_senders.remove(&removed_id);
483 }
484 }
485
486 let (id, data) = if let Some((id, _, data)) = msg {
487 (id, data)
488 } else {
489 continue;
490 };
491
492 if let Some((_, resp_sender)) = resp_senders.remove(&id) {
493 if !resp_sender.is_closed() {
494 if let Err(e) = resp_sender.send(Ok(data)) {
495 log::warn!("response sender send fail, {:?}", e);
496 }
497 } else {
498 log::warn!("response sender is closed");
499 }
500 }
501 }
502 log::info!("exit response Receiver");
503 });
504 self
505 }
506
507 #[inline]
508 pub fn req_queue_is_full(&self) -> bool {
509 self.req_queue_len() >= self.queue_cap
510 }
511
512 #[inline]
513 pub fn req_queue_len(&self) -> usize {
514 self.req_queue.read().len()
515 }
516
517 #[inline]
518 pub fn resp_queue_len(&self) -> usize {
519 self.resp_queue.read().len()
520 }
521
522 #[inline]
523 pub fn resp_senders_len(&self) -> usize {
524 self.resp_senders.len()
525 }
526
527 #[inline]
528 pub async fn send(&mut self, data: Vec<u8>) -> Result<Vec<u8>, SendError<Option<Vec<u8>>>> {
529 self.send_priority(data, Priority::MIN).await
530 }
531
532 #[inline]
533 pub async fn send_priority(
534 &mut self,
535 data: Vec<u8>,
536 p: Priority,
537 ) -> Result<Vec<u8>, SendError<Option<Vec<u8>>>> {
538 let (res_tx, res_rx) = oneshot::channel::<Result<Vec<u8>>>();
539 let id = next_id();
540 let resp_senders = self.resp_senders.clone();
541 resp_senders.insert(id, res_tx);
542 defer! {
543 resp_senders.remove(&id);
544 }
545 self._send_priority(id, data, p, res_rx).await
546 }
547 #[inline]
548 async fn _send_priority(
549 &mut self,
550 id: Id,
551 data: Vec<u8>,
552 p: Priority,
553 res_rx: oneshot::Receiver<Result<Vec<u8>>>,
554 ) -> Result<Vec<u8>, SendError<Option<Vec<u8>>>> {
555 if data.len() > self.chunk_size {
556 for msg in split_into_chunks(id, data.as_slice(), p, self.chunk_size) {
558 self.req_tx.send((p, msg)).await.map_err(Self::error)?;
559 }
560 } else {
561 let msg = Message {
562 id,
563 priority: p,
564 total_chunks: 0,
565 chunk_index: 0,
566 data: Some(data),
567 ..Default::default()
568 };
569 self.req_tx.send((p, msg)).await.map_err(Self::error)?;
570 }
571
572 let res = tokio::time::timeout(
573 self.timeout.unwrap_or_else(|| Duration::from_secs(120)),
574 res_rx,
575 )
576 .await
577 .map_err(|e| SendError::error(e.to_string(), None))?
578 .map_err(|e| SendError::error(e.to_string(), None))?
579 .map_err(|e| SendError::error(e.to_string(), None))?;
580
581 Ok(res)
582 }
583
584 #[inline]
585 pub async fn quick_send(
586 &mut self,
587 data: Vec<u8>,
588 ) -> Result<Vec<u8>, SendError<Option<Vec<u8>>>> {
589 self.send_priority(data, Priority::MAX).await
590 }
591
592 #[inline]
593 fn error(e: mpsc::SendError<(Priority, Message)>) -> SendError<Option<Vec<u8>>> {
594 if e.is_full() {
595 e.into_inner()
596 .map(|(_, msg)| SendError::<Option<Vec<u8>>>::full(msg.data))
597 .unwrap_or_else(|| SendError::<Option<Vec<u8>>>::disconnected(None))
598 } else if e.is_disconnected() {
599 SendError::<Option<Vec<u8>>>::disconnected(e.into_inner().map(|(_, msg)| msg.data))
600 } else {
601 SendError::<Option<Vec<u8>>>::disconnected(None)
602 }
603 }
604}
605
606#[derive(Clone)]
607struct AuthInterceptor {
608 auth_token: Option<MetadataValue<Ascii>>,
609}
610
611impl Interceptor for AuthInterceptor {
612 #[inline]
613 fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
614 if let Some(token) = self.auth_token.clone() {
615 request.metadata_mut().insert("authorization", token);
616 }
617 Ok(request)
618 }
619}
620
621#[allow(clippy::too_many_arguments)]
622#[inline]
623async fn connect(
624 addr: &str,
625 concurrency_limit: usize,
626 connect_timeout: Option<Duration>,
627 timeout: Option<Duration>,
628 tls: bool,
629 tls_ca: Option<&String>,
630 tls_domain: Option<&String>,
631 token: Option<String>,
632) -> Result<DataTransferClientType> {
633 let (endpoint, interceptor) = build_endpoint(
634 addr,
635 concurrency_limit,
636 connect_timeout,
637 timeout,
638 tls,
639 tls_ca,
640 tls_domain,
641 token,
642 )?;
643
644 let channel = endpoint.connect().await?;
646
647 Ok(DataTransferClient::with_interceptor(channel, interceptor))
649}
650
651#[allow(clippy::too_many_arguments)]
652#[inline]
653fn connect_lazy(
654 addr: &str,
655 concurrency_limit: usize,
656 connect_timeout: Option<Duration>,
657 timeout: Option<Duration>,
658 tls: bool,
659 tls_ca: Option<&String>,
660 tls_domain: Option<&String>,
661 token: Option<String>,
662) -> Result<DataTransferClientType> {
663 let (endpoint, interceptor) = build_endpoint(
664 addr,
665 concurrency_limit,
666 connect_timeout,
667 timeout,
668 tls,
669 tls_ca,
670 tls_domain,
671 token,
672 )?;
673
674 let channel = endpoint.connect_lazy();
676
677 Ok(DataTransferClient::with_interceptor(channel, interceptor))
679}
680
681#[allow(clippy::too_many_arguments)]
682#[inline]
683fn build_endpoint(
684 addr: &str,
685 concurrency_limit: usize,
686 connect_timeout: Option<Duration>,
687 timeout: Option<Duration>,
688 tls: bool,
689 tls_ca: Option<&String>,
690 tls_domain: Option<&String>,
691 token: Option<String>,
692) -> Result<(Endpoint, AuthInterceptor)> {
693 let tls_client_cfg = if tls {
695 let mut tls_client_cfg = ClientTlsConfig::new();
696 if let Some(tls_ca) = tls_ca {
697 let pem = std::fs::read_to_string(tls_ca)?;
698 tls_client_cfg = tls_client_cfg.ca_certificate(Certificate::from_pem(pem));
699 }
700 if let Some(tls_domain) = tls_domain {
701 tls_client_cfg = tls_client_cfg.domain_name(tls_domain);
702 }
703 Some(tls_client_cfg)
704 } else {
705 None
706 };
707
708 let auth_token = if let Some(token) = token {
710 if token.is_empty() {
711 return Err(Error::msg("auth token is empty"));
712 }
713 Some(format!("Bearer {}", token).parse::<MetadataValue<_>>()?)
714 } else {
715 None
716 };
717
718 let concurrency_limit = if concurrency_limit == 0 {
720 1
721 } else {
722 concurrency_limit
723 };
724
725 let endpoint = Channel::from_shared(format!("http://{}", addr)).map(|endpoint| {
727 let mut endpoint = endpoint.concurrency_limit(concurrency_limit);
728 if let Some(connect_timeout) = connect_timeout {
729 endpoint = endpoint.connect_timeout(connect_timeout);
730 }
731 if let Some(timeout) = timeout {
732 endpoint = endpoint.timeout(timeout);
733 }
734 if let Some(tls_client_cfg) = tls_client_cfg {
735 endpoint.tls_config(tls_client_cfg)
736 } else {
737 Ok(endpoint)
738 }
739 })??;
740 Ok((endpoint, AuthInterceptor { auth_token }))
741}
742
743#[derive(Clone)]
744struct Receiver {
745 rx: Arc<RwLock<mpsc::Receiver<(Priority, Message)>>>,
746}
747
748impl Receiver {
749 fn new(rx: mpsc::Receiver<(Priority, Message)>) -> Self {
750 Receiver {
751 rx: Arc::new(RwLock::new(rx)),
752 }
753 }
754
755 #[inline]
756 pub fn is_closed(&self) -> bool {
757 self.rx.read().is_closed()
758 }
759}
760
761impl Stream for Receiver {
762 type Item = Message;
763
764 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
765 match Pin::new(self.rx.write().deref_mut()).poll_next(cx) {
766 Poll::Pending => Poll::Pending,
767 Poll::Ready(None) => Poll::Ready(None),
768 Poll::Ready(Some((_, msg))) => Poll::Ready(Some(msg)),
769 }
770 }
771}
772
773#[inline]
774pub(crate) fn next_id() -> Id {
775 use once_cell::sync::OnceCell;
776 use std::sync::atomic::{AtomicU64, Ordering};
777 static ID_GENERATOR: OnceCell<AtomicU64> = OnceCell::new();
778 let id_generator = ID_GENERATOR.get_or_init(|| AtomicU64::new(1));
779 id_generator.fetch_add(1, Ordering::SeqCst)
780}
781
782#[derive(Clone, PartialEq, Eq)]
783pub enum SendError<T> {
784 SendError(mpsc::SendError<T>),
785 Error(String, Option<T>),
786}
787
788impl<T> SendError<T> {
789 #[inline]
790 pub fn full(val: T) -> Self {
791 SendError::SendError(mpsc::SendError::full(val))
792 }
793
794 #[inline]
795 pub fn disconnected(val: Option<T>) -> Self {
796 SendError::SendError(mpsc::SendError::disconnected(val))
797 }
798
799 #[inline]
800 pub fn error(e: String, val: Option<T>) -> Self {
801 SendError::Error(e, val)
802 }
803}
804
805impl<T> core::fmt::Display for SendError<T> {
806 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
807 match self {
808 SendError::SendError(e) => {
809 write!(f, "{:?}", e)
810 }
811 SendError::Error(e, _) => {
812 write!(f, "{:?}", e)
813 }
814 }
815 }
816}
817
818impl<T> core::fmt::Debug for SendError<T> {
819 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
820 match self {
821 SendError::SendError(e) => f.debug_struct("SendError").field("reason", e).finish(),
822 SendError::Error(e, _) => f.debug_struct("SendError").field("reason", e).finish(),
823 }
824 }
825}
826
827impl<T> SendError<T> {
828 #[inline]
830 pub fn is_full(&self) -> bool {
831 if let SendError::SendError(e) = self {
832 e.is_full()
833 } else {
834 false
835 }
836 }
837
838 #[inline]
840 pub fn is_disconnected(&self) -> bool {
841 if let SendError::SendError(e) = self {
842 e.is_disconnected()
843 } else {
844 false
845 }
846 }
847
848 #[inline]
850 pub fn into_inner(self) -> Option<T> {
851 match self {
852 SendError::SendError(inner) => inner.into_inner(),
853 SendError::Error(_, val) => val,
854 }
855 }
856}
857
858impl<T: core::any::Any> std::error::Error for SendError<T> {}