handy_grpc/
client.rs

1use std::ops::DerefMut;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use anyhow::anyhow;
8use collections::PriorityQueue;
9use futures::{Stream, StreamExt};
10use mpsc::with_priority_channel;
11use parking_lot::RwLock;
12use scopeguard::defer;
13use tokio::sync::oneshot;
14use tonic::codegen::InterceptedService;
15use tonic::metadata::Ascii;
16use tonic::service::Interceptor;
17use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint};
18use tonic::{metadata::MetadataValue, Request, Status};
19
20use super::transferpb::data_transfer_client::DataTransferClient;
21pub use super::transferpb::{self, Message};
22use super::{
23    split_into_chunks, ChunkedBuffer, Error, Id, Priority, Result, CHUNK_SIZE_LIMIT,
24    RECV_CHUNKS_TIMEOUT,
25};
26
27type Sender<T> = mpsc::Sender<T, mpsc::SendError<T>>;
28
29type PriorityQueueType = Arc<parking_lot::RwLock<PriorityQueue<Priority, Message>>>;
30
31type DataTransferClientType = DataTransferClient<InterceptedService<Channel, AuthInterceptor>>;
32
33type DashMap<K, V> = dashmap::DashMap<K, V, ahash::RandomState>;
34
35pub struct ClientBuilder {
36    addr: String,
37    concurrency_limit: usize,
38    connect_timeout: Option<Duration>,
39    timeout: Option<Duration>,
40    tls: bool,
41    tls_ca: Option<String>,
42    tls_domain: Option<String>,
43    auth_token: Option<String>,
44    chunk_size: usize,
45    recv_chunks_timeout: Duration,
46}
47
48impl Default for ClientBuilder {
49    fn default() -> Self {
50        Self {
51            addr: Default::default(),
52            concurrency_limit: 10,
53            connect_timeout: None,
54            timeout: None,
55            tls: false,
56            tls_ca: None,
57            tls_domain: None,
58            auth_token: None,
59            chunk_size: CHUNK_SIZE_LIMIT,
60            recv_chunks_timeout: RECV_CHUNKS_TIMEOUT,
61        }
62    }
63}
64
65impl ClientBuilder {
66    pub async fn connect(self) -> Result<Client> {
67        let inner = connect(
68            self.addr.as_str(),
69            self.concurrency_limit,
70            self.connect_timeout,
71            self.timeout,
72            self.tls,
73            self.tls_ca.as_ref(),
74            self.tls_domain.as_ref(),
75            self.auth_token.clone(),
76        )
77        .await?;
78        Ok(Client {
79            inner,
80            builder: Arc::new(self),
81        })
82    }
83
84    pub fn connect_lazy(self) -> Result<Client> {
85        let inner = connect_lazy(
86            self.addr.as_str(),
87            self.concurrency_limit,
88            self.connect_timeout,
89            self.timeout,
90            self.tls,
91            self.tls_ca.as_ref(),
92            self.tls_domain.as_ref(),
93            self.auth_token.clone(),
94        )?;
95        Ok(Client {
96            inner,
97            builder: Arc::new(self),
98        })
99    }
100
101    pub fn concurrency_limit(mut self, concurrency_limit: usize) -> Self {
102        self.concurrency_limit = concurrency_limit;
103        self
104    }
105
106    pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self {
107        self.connect_timeout = Some(connect_timeout);
108        self
109    }
110
111    pub fn timeout(mut self, timeout: Duration) -> Self {
112        self.timeout = Some(timeout);
113        self
114    }
115
116    pub fn tls(mut self, tls_ca: Option<String>, tls_domain: Option<String>) -> Self {
117        self.tls = true;
118        self.tls_ca = tls_ca;
119        self.tls_domain = tls_domain;
120        self
121    }
122
123    pub fn auth_token(mut self, token: Option<String>) -> Self {
124        self.auth_token = token;
125        self
126    }
127
128    pub fn chunk_size(mut self, chunk_size: usize) -> Self {
129        self.chunk_size = chunk_size;
130        self
131    }
132
133    pub fn recv_chunks_timeout(mut self, recv_chunks_timeout: Duration) -> Self {
134        self.recv_chunks_timeout = recv_chunks_timeout;
135        self
136    }
137}
138
139#[derive(Clone)]
140pub struct Client {
141    inner: DataTransferClientType,
142    builder: Arc<ClientBuilder>,
143}
144
145impl Client {
146    #[inline]
147    #[allow(clippy::new_ret_no_self)]
148    pub fn new(addr: String) -> ClientBuilder {
149        ClientBuilder {
150            addr,
151            ..Default::default()
152        }
153    }
154
155    #[inline]
156    fn connect(&mut self) -> &mut DataTransferClientType {
157        &mut self.inner
158    }
159
160    //@TODO 使用双向流传输方式替换直接响应试传输数据
161    #[inline]
162    pub async fn send(&mut self, data: Vec<u8>) -> Result<Vec<u8>> {
163        self.send_priority(data, Priority::MIN).await
164    }
165
166    #[inline]
167    pub async fn send_priority(&mut self, data: Vec<u8>, p: Priority) -> Result<Vec<u8>> {
168        if let Some(t) = self.builder.timeout {
169            tokio::time::timeout(t, self._send_priority(data, p)).await?
170        } else {
171            self._send_priority(data, p).await
172        }
173    }
174
175    #[inline]
176    async fn _send_priority(&mut self, data: Vec<u8>, p: Priority) -> Result<Vec<u8>> {
177        let chunk_size = self.builder.chunk_size;
178        let timeout = self.builder.timeout;
179        let c = self.connect();
180        if data.len() > chunk_size {
181            //chunked send
182            let mut resp_data = None;
183            for msg in split_into_chunks(next_id(), data.as_slice(), p, chunk_size) {
184                let req = if let Some(t) = timeout {
185                    let mut req = tonic::Request::new(msg);
186                    req.set_timeout(t);
187                    req
188                } else {
189                    tonic::Request::new(msg)
190                };
191                let resp = c.send(req).await.map_err(Error::new)?;
192                let data = resp.into_inner().data;
193                if resp_data.is_none() && data.is_some() {
194                    resp_data = data;
195                }
196            }
197            if let Some(resp_data) = resp_data {
198                Ok(resp_data)
199            } else {
200                Err(anyhow!("Timeout"))
201            }
202        } else {
203            let msg = Message {
204                id: next_id(),
205                priority: p,
206                total_chunks: 0,
207                chunk_index: 0,
208                data: Some(data),
209                ..Default::default()
210            };
211            let req = if let Some(t) = timeout {
212                let mut req = tonic::Request::new(msg);
213                req.set_timeout(t);
214                req
215            } else {
216                tonic::Request::new(msg)
217            };
218            let resp = c.send(req).await.map_err(Error::new);
219            let msg = resp?.into_inner();
220            Ok(msg.data.unwrap_or_default())
221        }
222    }
223
224    #[inline]
225    pub async fn transfer_start(&mut self, req_queue_cap: usize) -> Mailbox {
226        let mut this = self.clone();
227        let req_queue = Arc::new(parking_lot::RwLock::new(PriorityQueue::default()));
228        let (tx, rx) = with_priority_channel(req_queue.clone(), req_queue_cap);
229        let rx = Receiver::new(rx);
230        let mailbox = Mailbox::new(tx, req_queue, req_queue_cap, self.builder.chunk_size);
231        let addr = self.builder.addr.clone();
232        tokio::spawn(async move {
233            loop {
234                log::trace!("gRPC call transfer ... ");
235                if let Err(e) = this.connect().transfer(Request::new(rx.clone())).await {
236                    log::warn!(
237                        "gRPC call transfer failure, addr:{}, {}",
238                        addr,
239                        e.to_string()
240                    );
241                    tokio::time::sleep(Duration::from_secs(3)).await;
242                    continue;
243                }
244
245                log::info!(
246                    "transfer is exit, addr: {:?}, is_closed: {}",
247                    this.builder.addr,
248                    rx.is_closed()
249                );
250                break;
251            }
252        });
253        mailbox
254    }
255
256    #[inline]
257    pub async fn duplex_transfer_start(&mut self, queue_cap: usize) -> DuplexMailbox {
258        let mut this = self.clone();
259        let req_queue = Arc::new(parking_lot::RwLock::new(PriorityQueue::default()));
260        let (req_tx, req_rx) = with_priority_channel(req_queue.clone(), queue_cap);
261        let req_rx = Receiver::new(req_rx);
262
263        let resp_queue = Arc::new(parking_lot::RwLock::new(PriorityQueue::default()));
264        let (mut resp_tx, resp_rx) = with_priority_channel(resp_queue.clone(), queue_cap);
265        let resp_rx = Receiver::new(resp_rx);
266
267        let mailbox = DuplexMailbox::new(
268            req_tx,
269            resp_rx,
270            req_queue,
271            resp_queue,
272            queue_cap,
273            self.builder.chunk_size,
274            self.builder.recv_chunks_timeout,
275            self.builder.timeout,
276        );
277        let addr = self.builder.addr.clone();
278        tokio::spawn(async move {
279            'outer: loop {
280                log::trace!("gRPC call duplex transfer ... ");
281                match this
282                    .connect()
283                    .duplex_transfer(Request::new(req_rx.clone()))
284                    .await
285                {
286                    Err(e) => {
287                        log::warn!(
288                            "gRPC call duplex transfer failure, addr:{}, {}",
289                            addr,
290                            e.to_string()
291                        );
292                        tokio::time::sleep(Duration::from_secs(3)).await;
293                        continue;
294                    }
295                    Ok(resp) => {
296                        let mut resp_stream = resp.into_inner();
297                        while let Some(received) = resp_stream.next().await {
298                            match received {
299                                Err(e) => {
300                                    log::warn!(
301                                        "gRPC duplex transfer response stream recv failure, addr:{}, {}",
302                                        addr,
303                                        e.to_string()
304                                    );
305                                    tokio::time::sleep(Duration::from_secs(3)).await;
306                                    continue 'outer;
307                                }
308                                Ok(received) => {
309                                    if let Err(e) =
310                                        resp_tx.send((received.priority, received)).await
311                                    {
312                                        log::warn!(
313                                            "gRPC duplex transfer send response message failure, addr:{}, {}",
314                                            addr,
315                                            e.to_string()
316                                        );
317                                    }
318                                }
319                            }
320                        }
321                        log::warn!(
322                            "gRPC duplex transfer response stream recv None, addr:{}",
323                            addr,
324                        );
325                        tokio::time::sleep(Duration::from_secs(3)).await;
326                        continue;
327                    }
328                }
329            }
330        });
331        mailbox
332    }
333}
334
335#[derive(Clone)]
336pub struct Mailbox {
337    req_tx: Sender<(Priority, Message)>,
338    req_queue: PriorityQueueType,
339    req_queue_cap: usize,
340    chunk_size: usize,
341}
342
343impl Mailbox {
344    #[inline]
345    fn new(
346        req_tx: Sender<(Priority, Message)>,
347        req_queue: PriorityQueueType,
348        req_queue_cap: usize,
349        chunk_size: usize,
350    ) -> Self {
351        Self {
352            req_tx,
353            req_queue,
354            req_queue_cap,
355            chunk_size,
356        }
357    }
358
359    #[inline]
360    pub fn req_queue_is_full(&self) -> bool {
361        self.req_queue_len() >= self.req_queue_cap
362    }
363
364    #[inline]
365    pub fn req_queue_len(&self) -> usize {
366        self.req_queue.read().len()
367    }
368
369    #[inline]
370    pub async fn send(&mut self, data: Vec<u8>) -> Result<(), SendError<Vec<u8>>> {
371        self.send_priority(data, Priority::MIN).await
372    }
373
374    #[inline]
375    pub async fn send_priority(
376        &mut self,
377        data: Vec<u8>,
378        p: Priority,
379    ) -> Result<(), SendError<Vec<u8>>> {
380        if data.len() > self.chunk_size {
381            //chunked transfer
382            for msg in split_into_chunks(next_id(), data.as_slice(), p, self.chunk_size) {
383                self.req_tx.send((p, msg)).await.map_err(Self::error)?;
384            }
385            Ok(())
386        } else {
387            let msg = Message {
388                id: next_id(),
389                priority: p,
390                total_chunks: 0,
391                chunk_index: 0,
392                data: Some(data),
393                ..Default::default()
394            };
395            self.req_tx.send((p, msg)).await.map_err(Self::error)
396        }
397    }
398
399    #[inline]
400    pub async fn quick_send(&mut self, data: Vec<u8>) -> Result<(), SendError<Vec<u8>>> {
401        self.send_priority(data, Priority::MAX).await
402    }
403
404    #[inline]
405    fn error(e: mpsc::SendError<(Priority, Message)>) -> SendError<Vec<u8>> {
406        if e.is_full() {
407            e.into_inner()
408                .map(|(_, msg)| SendError::<Vec<u8>>::full(msg.data.unwrap_or_default()))
409                .unwrap_or_else(|| SendError::<Vec<u8>>::disconnected(None))
410        } else if e.is_disconnected() {
411            SendError::<Vec<u8>>::disconnected(
412                e.into_inner().map(|(_, msg)| msg.data.unwrap_or_default()),
413            )
414        } else {
415            SendError::<Vec<u8>>::disconnected(None)
416        }
417    }
418}
419
420#[derive(Clone)]
421pub struct DuplexMailbox {
422    req_tx: Sender<(Priority, Message)>,
423    req_queue: PriorityQueueType,
424    resp_queue: PriorityQueueType,
425    resp_senders: Arc<DashMap<Id, oneshot::Sender<Result<Vec<u8>>>>>,
426    queue_cap: usize,
427    chunk_size: usize,
428    timeout: Option<Duration>,
429}
430
431#[allow(clippy::too_many_arguments)]
432impl DuplexMailbox {
433    #[inline]
434    fn new(
435        req_tx: Sender<(Priority, Message)>,
436        resp_rx: Receiver,
437        req_queue: PriorityQueueType,
438        resp_queue: PriorityQueueType,
439        queue_cap: usize,
440        chunk_size: usize,
441        recv_chunks_timeout: Duration,
442        timeout: Option<Duration>,
443    ) -> Self {
444        let resp_chunked_buffer = ChunkedBuffer::new(recv_chunks_timeout);
445        Self {
446            req_tx,
447            req_queue,
448            resp_queue,
449            resp_senders: Arc::new(DashMap::default()),
450            queue_cap,
451            chunk_size,
452            timeout,
453        }
454        .start(resp_rx, resp_chunked_buffer)
455    }
456
457    fn start(self, mut resp_rx: Receiver, resp_chunked_buffer: ChunkedBuffer) -> Self {
458        let resp_senders = self.resp_senders.clone();
459        tokio::spawn(async move {
460            let mut removed_ids = Vec::new();
461            while let Some(mut msg) = resp_rx.next().await {
462                if let Some(err) = msg.err.take() {
463                    if let Some((_, resp_sender)) = resp_senders.remove(&msg.id) {
464                        if !resp_sender.is_closed() {
465                            if let Err(e) = resp_sender.send(Err(anyhow!(err))) {
466                                log::warn!("response sender send fail, {:?}", e);
467                            }
468                        } else {
469                            log::warn!("response sender is closed");
470                        }
471                    }
472                    continue;
473                }
474
475                let msg = resp_chunked_buffer
476                    .merge(msg, None, Some(&mut removed_ids))
477                    .await;
478
479                if !removed_ids.is_empty() {
480                    for removed_id in removed_ids.drain(..) {
481                        log::debug!("removed_id: {}", removed_id);
482                        resp_senders.remove(&removed_id);
483                    }
484                }
485
486                let (id, data) = if let Some((id, _, data)) = msg {
487                    (id, data)
488                } else {
489                    continue;
490                };
491
492                if let Some((_, resp_sender)) = resp_senders.remove(&id) {
493                    if !resp_sender.is_closed() {
494                        if let Err(e) = resp_sender.send(Ok(data)) {
495                            log::warn!("response sender send fail, {:?}", e);
496                        }
497                    } else {
498                        log::warn!("response sender is closed");
499                    }
500                }
501            }
502            log::info!("exit response Receiver");
503        });
504        self
505    }
506
507    #[inline]
508    pub fn req_queue_is_full(&self) -> bool {
509        self.req_queue_len() >= self.queue_cap
510    }
511
512    #[inline]
513    pub fn req_queue_len(&self) -> usize {
514        self.req_queue.read().len()
515    }
516
517    #[inline]
518    pub fn resp_queue_len(&self) -> usize {
519        self.resp_queue.read().len()
520    }
521
522    #[inline]
523    pub fn resp_senders_len(&self) -> usize {
524        self.resp_senders.len()
525    }
526
527    #[inline]
528    pub async fn send(&mut self, data: Vec<u8>) -> Result<Vec<u8>, SendError<Option<Vec<u8>>>> {
529        self.send_priority(data, Priority::MIN).await
530    }
531
532    #[inline]
533    pub async fn send_priority(
534        &mut self,
535        data: Vec<u8>,
536        p: Priority,
537    ) -> Result<Vec<u8>, SendError<Option<Vec<u8>>>> {
538        let (res_tx, res_rx) = oneshot::channel::<Result<Vec<u8>>>();
539        let id = next_id();
540        let resp_senders = self.resp_senders.clone();
541        resp_senders.insert(id, res_tx);
542        defer! {
543            resp_senders.remove(&id);
544        }
545        self._send_priority(id, data, p, res_rx).await
546    }
547    #[inline]
548    async fn _send_priority(
549        &mut self,
550        id: Id,
551        data: Vec<u8>,
552        p: Priority,
553        res_rx: oneshot::Receiver<Result<Vec<u8>>>,
554    ) -> Result<Vec<u8>, SendError<Option<Vec<u8>>>> {
555        if data.len() > self.chunk_size {
556            //chunked transfer
557            for msg in split_into_chunks(id, data.as_slice(), p, self.chunk_size) {
558                self.req_tx.send((p, msg)).await.map_err(Self::error)?;
559            }
560        } else {
561            let msg = Message {
562                id,
563                priority: p,
564                total_chunks: 0,
565                chunk_index: 0,
566                data: Some(data),
567                ..Default::default()
568            };
569            self.req_tx.send((p, msg)).await.map_err(Self::error)?;
570        }
571
572        let res = tokio::time::timeout(
573            self.timeout.unwrap_or_else(|| Duration::from_secs(120)),
574            res_rx,
575        )
576        .await
577        .map_err(|e| SendError::error(e.to_string(), None))?
578        .map_err(|e| SendError::error(e.to_string(), None))?
579        .map_err(|e| SendError::error(e.to_string(), None))?;
580
581        Ok(res)
582    }
583
584    #[inline]
585    pub async fn quick_send(
586        &mut self,
587        data: Vec<u8>,
588    ) -> Result<Vec<u8>, SendError<Option<Vec<u8>>>> {
589        self.send_priority(data, Priority::MAX).await
590    }
591
592    #[inline]
593    fn error(e: mpsc::SendError<(Priority, Message)>) -> SendError<Option<Vec<u8>>> {
594        if e.is_full() {
595            e.into_inner()
596                .map(|(_, msg)| SendError::<Option<Vec<u8>>>::full(msg.data))
597                .unwrap_or_else(|| SendError::<Option<Vec<u8>>>::disconnected(None))
598        } else if e.is_disconnected() {
599            SendError::<Option<Vec<u8>>>::disconnected(e.into_inner().map(|(_, msg)| msg.data))
600        } else {
601            SendError::<Option<Vec<u8>>>::disconnected(None)
602        }
603    }
604}
605
606#[derive(Clone)]
607struct AuthInterceptor {
608    auth_token: Option<MetadataValue<Ascii>>,
609}
610
611impl Interceptor for AuthInterceptor {
612    #[inline]
613    fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
614        if let Some(token) = self.auth_token.clone() {
615            request.metadata_mut().insert("authorization", token);
616        }
617        Ok(request)
618    }
619}
620
621#[allow(clippy::too_many_arguments)]
622#[inline]
623async fn connect(
624    addr: &str,
625    concurrency_limit: usize,
626    connect_timeout: Option<Duration>,
627    timeout: Option<Duration>,
628    tls: bool,
629    tls_ca: Option<&String>,
630    tls_domain: Option<&String>,
631    token: Option<String>,
632) -> Result<DataTransferClientType> {
633    let (endpoint, interceptor) = build_endpoint(
634        addr,
635        concurrency_limit,
636        connect_timeout,
637        timeout,
638        tls,
639        tls_ca,
640        tls_domain,
641        token,
642    )?;
643
644    //Connect
645    let channel = endpoint.connect().await?;
646
647    //Client
648    Ok(DataTransferClient::with_interceptor(channel, interceptor))
649}
650
651#[allow(clippy::too_many_arguments)]
652#[inline]
653fn connect_lazy(
654    addr: &str,
655    concurrency_limit: usize,
656    connect_timeout: Option<Duration>,
657    timeout: Option<Duration>,
658    tls: bool,
659    tls_ca: Option<&String>,
660    tls_domain: Option<&String>,
661    token: Option<String>,
662) -> Result<DataTransferClientType> {
663    let (endpoint, interceptor) = build_endpoint(
664        addr,
665        concurrency_limit,
666        connect_timeout,
667        timeout,
668        tls,
669        tls_ca,
670        tls_domain,
671        token,
672    )?;
673
674    //Connect lazy
675    let channel = endpoint.connect_lazy();
676
677    //Client
678    Ok(DataTransferClient::with_interceptor(channel, interceptor))
679}
680
681#[allow(clippy::too_many_arguments)]
682#[inline]
683fn build_endpoint(
684    addr: &str,
685    concurrency_limit: usize,
686    connect_timeout: Option<Duration>,
687    timeout: Option<Duration>,
688    tls: bool,
689    tls_ca: Option<&String>,
690    tls_domain: Option<&String>,
691    token: Option<String>,
692) -> Result<(Endpoint, AuthInterceptor)> {
693    //TLS支持
694    let tls_client_cfg = if tls {
695        let mut tls_client_cfg = ClientTlsConfig::new();
696        if let Some(tls_ca) = tls_ca {
697            let pem = std::fs::read_to_string(tls_ca)?;
698            tls_client_cfg = tls_client_cfg.ca_certificate(Certificate::from_pem(pem));
699        }
700        if let Some(tls_domain) = tls_domain {
701            tls_client_cfg = tls_client_cfg.domain_name(tls_domain);
702        }
703        Some(tls_client_cfg)
704    } else {
705        None
706    };
707
708    //gRPC Auth
709    let auth_token = if let Some(token) = token {
710        if token.is_empty() {
711            return Err(Error::msg("auth token is empty"));
712        }
713        Some(format!("Bearer {}", token).parse::<MetadataValue<_>>()?)
714    } else {
715        None
716    };
717
718    //Concurrency limit
719    let concurrency_limit = if concurrency_limit == 0 {
720        1
721    } else {
722        concurrency_limit
723    };
724
725    //Endpoint
726    let endpoint = Channel::from_shared(format!("http://{}", addr)).map(|endpoint| {
727        let mut endpoint = endpoint.concurrency_limit(concurrency_limit);
728        if let Some(connect_timeout) = connect_timeout {
729            endpoint = endpoint.connect_timeout(connect_timeout);
730        }
731        if let Some(timeout) = timeout {
732            endpoint = endpoint.timeout(timeout);
733        }
734        if let Some(tls_client_cfg) = tls_client_cfg {
735            endpoint.tls_config(tls_client_cfg)
736        } else {
737            Ok(endpoint)
738        }
739    })??;
740    Ok((endpoint, AuthInterceptor { auth_token }))
741}
742
743#[derive(Clone)]
744struct Receiver {
745    rx: Arc<RwLock<mpsc::Receiver<(Priority, Message)>>>,
746}
747
748impl Receiver {
749    fn new(rx: mpsc::Receiver<(Priority, Message)>) -> Self {
750        Receiver {
751            rx: Arc::new(RwLock::new(rx)),
752        }
753    }
754
755    #[inline]
756    pub fn is_closed(&self) -> bool {
757        self.rx.read().is_closed()
758    }
759}
760
761impl Stream for Receiver {
762    type Item = Message;
763
764    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
765        match Pin::new(self.rx.write().deref_mut()).poll_next(cx) {
766            Poll::Pending => Poll::Pending,
767            Poll::Ready(None) => Poll::Ready(None),
768            Poll::Ready(Some((_, msg))) => Poll::Ready(Some(msg)),
769        }
770    }
771}
772
773#[inline]
774pub(crate) fn next_id() -> Id {
775    use once_cell::sync::OnceCell;
776    use std::sync::atomic::{AtomicU64, Ordering};
777    static ID_GENERATOR: OnceCell<AtomicU64> = OnceCell::new();
778    let id_generator = ID_GENERATOR.get_or_init(|| AtomicU64::new(1));
779    id_generator.fetch_add(1, Ordering::SeqCst)
780}
781
782#[derive(Clone, PartialEq, Eq)]
783pub enum SendError<T> {
784    SendError(mpsc::SendError<T>),
785    Error(String, Option<T>),
786}
787
788impl<T> SendError<T> {
789    #[inline]
790    pub fn full(val: T) -> Self {
791        SendError::SendError(mpsc::SendError::full(val))
792    }
793
794    #[inline]
795    pub fn disconnected(val: Option<T>) -> Self {
796        SendError::SendError(mpsc::SendError::disconnected(val))
797    }
798
799    #[inline]
800    pub fn error(e: String, val: Option<T>) -> Self {
801        SendError::Error(e, val)
802    }
803}
804
805impl<T> core::fmt::Display for SendError<T> {
806    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
807        match self {
808            SendError::SendError(e) => {
809                write!(f, "{:?}", e)
810            }
811            SendError::Error(e, _) => {
812                write!(f, "{:?}", e)
813            }
814        }
815    }
816}
817
818impl<T> core::fmt::Debug for SendError<T> {
819    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
820        match self {
821            SendError::SendError(e) => f.debug_struct("SendError").field("reason", e).finish(),
822            SendError::Error(e, _) => f.debug_struct("SendError").field("reason", e).finish(),
823        }
824    }
825}
826
827impl<T> SendError<T> {
828    /// Returns `true` if this error is a result of the mpsc being full.
829    #[inline]
830    pub fn is_full(&self) -> bool {
831        if let SendError::SendError(e) = self {
832            e.is_full()
833        } else {
834            false
835        }
836    }
837
838    /// Returns `true` if this error is a result of the receiver being dropped.
839    #[inline]
840    pub fn is_disconnected(&self) -> bool {
841        if let SendError::SendError(e) = self {
842            e.is_disconnected()
843        } else {
844            false
845        }
846    }
847
848    /// Returns the message that was attempted to be sent but failed.
849    #[inline]
850    pub fn into_inner(self) -> Option<T> {
851        match self {
852            SendError::SendError(inner) => inner.into_inner(),
853            SendError::Error(_, val) => val,
854        }
855    }
856}
857
858impl<T: core::any::Any> std::error::Error for SendError<T> {}