meslin/features/
priority.rs1use crate::*;
2use async_priority_channel as prio;
3use std::fmt::Debug;
4
5pub struct Sender<P, O: Ord> {
7 sender: prio::Sender<P, O>,
8}
9
10pub use prio::Receiver;
12
13impl<P, O: Ord> Sender<P, O> {
14 pub fn inner(&self) -> &prio::Sender<P, O> {
15 &self.sender
16 }
17
18 pub fn into_inner(self) -> prio::Sender<P, O> {
19 self.sender
20 }
21
22 pub fn inner_mut(&mut self) -> &mut prio::Sender<P, O> {
23 &mut self.sender
24 }
25
26 pub fn from_inner(sender: prio::Sender<P, O>) -> Self {
27 Self { sender }
28 }
29}
30
31impl<P, O: Ord> IsSender for Sender<P, O> {
32 type With = O;
33
34 fn is_closed(&self) -> bool {
35 self.sender.is_closed()
36 }
37
38 fn capacity(&self) -> Option<usize> {
39 self.sender.capacity().map(|c| c.try_into().unwrap())
40 }
41
42 fn len(&self) -> usize {
43 self.sender.len().try_into().unwrap()
44 }
45
46 fn receiver_count(&self) -> usize {
47 self.sender.receiver_count()
48 }
49
50 fn sender_count(&self) -> usize {
51 self.sender.sender_count()
52 }
53}
54
55impl<P: Send, O: Ord + Send> SendsProtocol for Sender<P, O> {
56 type Protocol = P;
57
58 async fn send_protocol_with(
59 this: &Self,
60 protocol: Self::Protocol,
61 with: O,
62 ) -> Result<(), SendError<(Self::Protocol, O)>> {
63 this.sender
64 .send(protocol, with)
65 .await
66 .map_err(|e| SendError(e.0))
67 }
68
69 fn try_send_protocol_with(
70 this: &Self,
71 protocol: Self::Protocol,
72 with: O,
73 ) -> Result<(), TrySendError<(Self::Protocol, O)>> {
74 this.sender.try_send(protocol, with).map_err(|e| match e {
75 prio::TrySendError::Full(e) => TrySendError::Full(e),
76 prio::TrySendError::Closed(e) => TrySendError::Closed(e),
77 })
78 }
79}
80
81impl<P: Debug, O: Ord + Debug> Debug for Sender<P, O> {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 f.debug_struct("Sender")
84 .field("sender", &self.sender)
85 .finish()
86 }
87}
88
89impl<P, O: Ord> Clone for Sender<P, O> {
90 fn clone(&self) -> Self {
91 Self {
92 sender: self.sender.clone(),
93 }
94 }
95}
96
97pub fn bounded<P, O: Ord>(size: usize) -> (Sender<P, O>, prio::Receiver<P, O>) {
98 let (sender, receiver) = prio::bounded(size.try_into().unwrap());
99 (Sender { sender }, receiver)
100}
101
102pub fn unbounded<P, O: Ord>() -> (Sender<P, O>, prio::Receiver<P, O>) {
103 let (sender, receiver) = prio::unbounded();
104 (Sender { sender }, receiver)
105}