meslin/features/
priority.rs

1use crate::*;
2use async_priority_channel as prio;
3use std::fmt::Debug;
4
5/// Wrapper around [`async_priority_channel::Sender`].
6pub struct Sender<P, O: Ord> {
7    sender: prio::Sender<P, O>,
8}
9
10/// Re-export of [`async_priority_channel::Receiver`].
11pub use prio::Receiver;
12
13impl<P, O: Ord> Sender<P, O> {
14    pub fn inner(&self) -> &prio::Sender<P, O> {
15        &self.sender
16    }
17
18    pub fn into_inner(self) -> prio::Sender<P, O> {
19        self.sender
20    }
21
22    pub fn inner_mut(&mut self) -> &mut prio::Sender<P, O> {
23        &mut self.sender
24    }
25
26    pub fn from_inner(sender: prio::Sender<P, O>) -> Self {
27        Self { sender }
28    }
29}
30
31impl<P, O: Ord> IsSender for Sender<P, O> {
32    type With = O;
33
34    fn is_closed(&self) -> bool {
35        self.sender.is_closed()
36    }
37
38    fn capacity(&self) -> Option<usize> {
39        self.sender.capacity().map(|c| c.try_into().unwrap())
40    }
41
42    fn len(&self) -> usize {
43        self.sender.len().try_into().unwrap()
44    }
45
46    fn receiver_count(&self) -> usize {
47        self.sender.receiver_count()
48    }
49
50    fn sender_count(&self) -> usize {
51        self.sender.sender_count()
52    }
53}
54
55impl<P: Send, O: Ord + Send> SendsProtocol for Sender<P, O> {
56    type Protocol = P;
57
58    async fn send_protocol_with(
59        this: &Self,
60        protocol: Self::Protocol,
61        with: O,
62    ) -> Result<(), SendError<(Self::Protocol, O)>> {
63        this.sender
64            .send(protocol, with)
65            .await
66            .map_err(|e| SendError(e.0))
67    }
68
69    fn try_send_protocol_with(
70        this: &Self,
71        protocol: Self::Protocol,
72        with: O,
73    ) -> Result<(), TrySendError<(Self::Protocol, O)>> {
74        this.sender.try_send(protocol, with).map_err(|e| match e {
75            prio::TrySendError::Full(e) => TrySendError::Full(e),
76            prio::TrySendError::Closed(e) => TrySendError::Closed(e),
77        })
78    }
79}
80
81impl<P: Debug, O: Ord + Debug> Debug for Sender<P, O> {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        f.debug_struct("Sender")
84            .field("sender", &self.sender)
85            .finish()
86    }
87}
88
89impl<P, O: Ord> Clone for Sender<P, O> {
90    fn clone(&self) -> Self {
91        Self {
92            sender: self.sender.clone(),
93        }
94    }
95}
96
97pub fn bounded<P, O: Ord>(size: usize) -> (Sender<P, O>, prio::Receiver<P, O>) {
98    let (sender, receiver) = prio::bounded(size.try_into().unwrap());
99    (Sender { sender }, receiver)
100}
101
102pub fn unbounded<P, O: Ord>() -> (Sender<P, O>, prio::Receiver<P, O>) {
103    let (sender, receiver) = prio::unbounded();
104    (Sender { sender }, receiver)
105}