nonblocking_channel/
lib.rs1use std::{
2 mem::MaybeUninit,
3 num::NonZeroUsize,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc, Mutex,
7 },
8};
9
10use ringbuf::{Consumer, HeapRb, Producer, SharedRb};
11
12#[must_use]
14#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
15pub enum SendResult<T> {
16 Ok,
17 Full(T),
18 Disconnected,
19}
20
21impl<T> SendResult<T> {
22 pub fn is_disconnected(&self) -> bool {
24 matches!(self, Self::Disconnected)
25 }
26
27 pub fn is_ok(&self) -> bool {
29 matches!(self, Self::Ok)
30 }
31}
32impl<T: std::fmt::Debug> SendResult<T> {
33 pub fn unwrap(self) {
34 match self {
35 SendResult::Ok => return,
36 SendResult::Full(message) => panic!(
37 "failed to send message - queue was full when sending {:?}",
38 message
39 ),
40 SendResult::Disconnected => panic!("client was disconnected when sending message"),
41 }
42 }
43}
44
45pub struct NonBlockingSender<T> {
52 inner: Producer<T, Arc<SharedRb<T, Vec<MaybeUninit<T>>>>>,
53 is_closed: Arc<AtomicBool>,
54}
55impl<T> NonBlockingSender<T> {
56 pub fn try_send(&mut self, message: T) -> SendResult<T> {
62 if self.is_closed.load(Ordering::SeqCst) {
63 SendResult::Disconnected
64 } else {
65 let res = self.inner.push(message);
66 if let Err(message) = res {
67 SendResult::Full(message)
68 } else {
69 SendResult::Ok
70 }
71 }
72 }
73
74 pub fn mpsc(self) -> MicroBlockingSender<T> {
76 return MicroBlockingSender {
77 inner: Arc::new(Mutex::new(self)),
78 };
79 }
80}
81
82impl<T> Drop for NonBlockingSender<T> {
83 fn drop(&mut self) {
84 self.is_closed.store(true, Ordering::SeqCst);
86 }
87}
88
89pub struct MicroBlockingSender<T> {
93 inner: Arc<Mutex<NonBlockingSender<T>>>,
94}
95
96impl<T> MicroBlockingSender<T> {
97 pub fn try_send(&self, message: T) -> SendResult<T> {
102 self.inner.lock().unwrap().try_send(message)
103 }
104}
105
106impl<T> Clone for MicroBlockingSender<T> {
107 fn clone(&self) -> Self {
108 Self {
109 inner: self.inner.clone(),
110 }
111 }
112}
113
114#[must_use]
116#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
117pub enum RecvResult<T> {
118 Ok(T),
119 Empty,
120 Disconnected,
121}
122
123impl<T> RecvResult<T> {
124 pub fn is_disconnected(&self) -> bool {
126 matches!(self, Self::Disconnected)
127 }
128
129 pub fn is_ok(&self) -> bool {
131 matches!(self, Self::Ok(_) | Self::Empty)
132 }
133}
134impl<T> RecvResult<T> {
135 pub fn unwrap(self) -> Option<T> {
136 match self {
137 RecvResult::Ok(message) => Some(message),
138 RecvResult::Empty => None,
139 RecvResult::Disconnected => panic!("receiver was disconnected when receiving message"),
140 }
141 }
142}
143
144pub struct NonBlockingReceiver<T> {
146 inner: Consumer<T, Arc<SharedRb<T, Vec<MaybeUninit<T>>>>>,
147 is_closed: Arc<AtomicBool>,
148}
149impl<T> NonBlockingReceiver<T> {
150 pub fn try_recv(&mut self) -> RecvResult<T> {
156 if self.is_closed.load(Ordering::SeqCst) {
157 RecvResult::Disconnected
158 } else {
159 let res = self.inner.pop();
160 if let Some(message) = res {
161 RecvResult::Ok(message)
162 } else {
163 RecvResult::Empty
164 }
165 }
166 }
167}
168
169impl<T> Drop for NonBlockingReceiver<T> {
170 fn drop(&mut self) {
171 self.is_closed.store(true, Ordering::SeqCst);
173 }
174}
175
176pub fn nonblocking_channel<T>(
177 capacity: NonZeroUsize,
178) -> (NonBlockingSender<T>, NonBlockingReceiver<T>) {
179 let (sender, receiver) = HeapRb::<T>::new(capacity.get()).split();
180 let is_closed = Arc::new(AtomicBool::from(false));
181
182 let sender = NonBlockingSender {
183 inner: sender,
184 is_closed: Arc::clone(&is_closed),
185 };
186 let receiver = NonBlockingReceiver {
187 inner: receiver,
188 is_closed,
189 };
190
191 return (sender, receiver);
192}