1use crossbeam_queue::SegQueue;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
4
5use super::{Backpressure, ChannelRecv, ChannelSend, ChannelStats, CloseBehavior, RecvOutcome};
6
7#[cfg(feature = "metrics")]
8use crate::metrics::MetricsSink;
9
10struct UnboundedInner<T> {
11 queue: SegQueue<T>,
12 closed: AtomicBool,
13 senders: AtomicUsize,
14 receivers: AtomicUsize,
15 enqueued: AtomicU64,
16 dropped: AtomicU64,
17 drained: AtomicU64,
18 depth: AtomicUsize,
19 close_behavior: CloseBehavior,
20 #[cfg(feature = "metrics")]
21 metrics: Option<Arc<dyn MetricsSink>>,
22}
23
24impl<T> UnboundedInner<T> {
25 fn new(close_behavior: CloseBehavior) -> Self {
26 Self {
27 queue: SegQueue::new(),
28 closed: AtomicBool::new(false),
29 senders: AtomicUsize::new(1),
30 receivers: AtomicUsize::new(1),
31 enqueued: AtomicU64::new(0),
32 dropped: AtomicU64::new(0),
33 drained: AtomicU64::new(0),
34 depth: AtomicUsize::new(0),
35 close_behavior,
36 #[cfg(feature = "metrics")]
37 metrics: None,
38 }
39 }
40
41 #[cfg(feature = "metrics")]
42 fn new_with_metrics(close_behavior: CloseBehavior, metrics: Arc<dyn MetricsSink>) -> Self {
43 Self {
44 queue: SegQueue::new(),
45 closed: AtomicBool::new(false),
46 senders: AtomicUsize::new(1),
47 receivers: AtomicUsize::new(1),
48 enqueued: AtomicU64::new(0),
49 dropped: AtomicU64::new(0),
50 drained: AtomicU64::new(0),
51 depth: AtomicUsize::new(0),
52 close_behavior,
53 metrics: Some(metrics),
54 }
55 }
56
57 fn mark_closed(&self) {
58 self.closed.store(true, Ordering::Release);
59 }
60
61 fn try_close(&self) {
62 match self.close_behavior {
63 CloseBehavior::FailFast => {
64 if self.senders.load(Ordering::Acquire) == 0
65 || self.receivers.load(Ordering::Acquire) == 0
66 {
67 self.mark_closed();
68 }
69 }
70 CloseBehavior::DrainUntilSendersDone => {
71 if self.senders.load(Ordering::Acquire) == 0 {
72 self.mark_closed();
73 }
74 }
75 }
76 }
77
78 #[cfg(feature = "metrics")]
79 fn inc(&self, key: &'static str) {
80 if let Some(metrics) = &self.metrics {
81 metrics.increment(key, 1);
82 }
83 }
84}
85
86pub struct UnboundedSender<T> {
87 inner: Arc<UnboundedInner<T>>,
88}
89
90impl<T> Clone for UnboundedSender<T> {
91 fn clone(&self) -> Self {
92 self.inner.senders.fetch_add(1, Ordering::Relaxed);
93 Self {
94 inner: Arc::clone(&self.inner),
95 }
96 }
97}
98
99impl<T> Drop for UnboundedSender<T> {
100 fn drop(&mut self) {
101 self.inner.senders.fetch_sub(1, Ordering::Relaxed);
102 self.inner.try_close();
103 }
104}
105
106impl<T> std::fmt::Debug for UnboundedSender<T> {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct("UnboundedSender").finish_non_exhaustive()
109 }
110}
111
112pub struct UnboundedReceiver<T> {
113 inner: Arc<UnboundedInner<T>>,
114}
115
116impl<T> Clone for UnboundedReceiver<T> {
117 fn clone(&self) -> Self {
118 self.inner.receivers.fetch_add(1, Ordering::Relaxed);
119 Self {
120 inner: Arc::clone(&self.inner),
121 }
122 }
123}
124
125impl<T> Drop for UnboundedReceiver<T> {
126 fn drop(&mut self) {
127 self.inner.receivers.fetch_sub(1, Ordering::Relaxed);
128 self.inner.try_close();
129 }
130}
131
132pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
133 let inner = Arc::new(UnboundedInner::new(CloseBehavior::FailFast));
134 (
135 UnboundedSender {
136 inner: Arc::clone(&inner),
137 },
138 UnboundedReceiver { inner },
139 )
140}
141
142pub fn unbounded_with_behavior<T>(
143 close_behavior: CloseBehavior,
144) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
145 let inner = Arc::new(UnboundedInner::new(close_behavior));
146 (
147 UnboundedSender {
148 inner: Arc::clone(&inner),
149 },
150 UnboundedReceiver { inner },
151 )
152}
153
154#[cfg(feature = "metrics")]
155pub fn unbounded_with_metrics<T>(
156 metrics: Arc<dyn MetricsSink>,
157) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
158 let inner = Arc::new(UnboundedInner::new_with_metrics(
159 CloseBehavior::FailFast,
160 metrics,
161 ));
162 (
163 UnboundedSender {
164 inner: Arc::clone(&inner),
165 },
166 UnboundedReceiver { inner },
167 )
168}
169
170#[cfg(feature = "metrics")]
171pub fn unbounded_with_metrics_and_behavior<T>(
172 close_behavior: CloseBehavior,
173 metrics: Arc<dyn MetricsSink>,
174) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
175 let inner = Arc::new(UnboundedInner::new_with_metrics(close_behavior, metrics));
176 (
177 UnboundedSender {
178 inner: Arc::clone(&inner),
179 },
180 UnboundedReceiver { inner },
181 )
182}
183
184impl<T: Send> ChannelSend<T> for UnboundedSender<T> {
185 fn send(&self, value: T) -> Backpressure {
186 if self.inner.closed.load(Ordering::Acquire) {
187 #[cfg(feature = "metrics")]
188 self.inner.inc("channel.unbounded.closed");
189 return Backpressure::Closed;
190 }
191 self.inner.queue.push(value);
192 self.inner.enqueued.fetch_add(1, Ordering::Relaxed);
193 self.inner.depth.fetch_add(1, Ordering::Relaxed);
194 Backpressure::Ok
195 }
196}
197
198impl<T: Send> ChannelRecv<T> for UnboundedReceiver<T> {
199 fn try_recv(&self) -> RecvOutcome<T> {
200 match self.inner.queue.pop() {
201 Some(v) => {
202 self.inner.drained.fetch_add(1, Ordering::Relaxed);
203 self.inner.depth.fetch_sub(1, Ordering::Relaxed);
204 RecvOutcome::Data(v)
205 }
206 None if self.inner.closed.load(Ordering::Acquire) => RecvOutcome::Closed,
207 None => RecvOutcome::Empty,
208 }
209 }
210}
211
212impl<T> UnboundedReceiver<T> {
213 pub fn stats(&self) -> ChannelStats {
214 ChannelStats {
215 enqueued: self.inner.enqueued.load(Ordering::Relaxed),
216 dropped: self.inner.dropped.load(Ordering::Relaxed),
217 drained: self.inner.drained.load(Ordering::Relaxed),
218 depth: self.inner.depth.load(Ordering::Relaxed),
219 closed: self.inner.closed.load(Ordering::Relaxed),
220 }
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use proptest::prelude::*;
228 use std::sync::Arc;
229 use std::sync::atomic::{AtomicUsize, Ordering};
230 use std::thread;
231
232 #[test]
233 fn unbounded_send_recv() {
234 let (tx, rx) = unbounded();
235 assert_eq!(tx.send(7), Backpressure::Ok);
236 assert_eq!(rx.try_recv(), RecvOutcome::Data(7));
237 assert_eq!(rx.try_recv(), RecvOutcome::Empty);
238 }
239
240 #[test]
241 fn unbounded_closed_when_senders_drop() {
242 let (tx, rx) = unbounded::<u64>();
243 drop(tx);
244 assert_eq!(rx.try_recv(), RecvOutcome::Closed);
245 }
246
247 proptest! {
248 #[test]
249 fn unbounded_preserves_order(input in proptest::collection::vec(any::<u32>(), 1..50)) {
250 let (tx, rx) = unbounded();
251 for v in &input {
252 let _ = tx.send(*v);
253 }
254 let mut drained = Vec::new();
255 while let RecvOutcome::Data(v) = rx.try_recv() {
256 drained.push(v);
257 }
258 prop_assert_eq!(drained, input);
259 }
260 }
261
262 #[test]
263 fn unbounded_mpmc_stress() {
264 let (tx, rx) = unbounded();
265 let tx = Arc::new(tx);
266 let rx = Arc::new(rx);
267 let produced = 4usize * 50usize;
268 let received = Arc::new(AtomicUsize::new(0));
269
270 let mut handles = Vec::new();
271 for _ in 0..4 {
272 let txc = tx.clone();
273 handles.push(thread::spawn(move || {
274 for i in 0..50u32 {
275 let _ = txc.send(i);
276 }
277 }));
278 }
279
280 let mut recv_handles = Vec::new();
281 for _ in 0..2 {
282 let rxc = rx.clone();
283 let recv_count = received.clone();
284 recv_handles.push(thread::spawn(move || {
285 loop {
286 match rxc.try_recv() {
287 RecvOutcome::Data(_) => {
288 recv_count.fetch_add(1, Ordering::Relaxed);
289 }
290 RecvOutcome::Empty => {
291 if recv_count.load(Ordering::Relaxed) >= produced {
292 break;
293 }
294 std::thread::yield_now();
295 }
296 RecvOutcome::Closed => break,
297 }
298 }
299 }));
300 }
301
302 for h in handles {
303 h.join().unwrap();
304 }
305 drop(tx);
306
307 for h in recv_handles {
308 h.join().unwrap();
309 }
310 assert_eq!(received.load(Ordering::Relaxed), produced);
311 }
312}
313
314#[cfg(all(test, feature = "metrics"))]
315mod metric_tests {
316 use super::*;
317 use crate::metrics::InMemoryMetrics;
318 use std::sync::Arc;
319
320 #[test]
321 fn metrics_record_closed() {
322 let metrics = Arc::new(InMemoryMetrics::default());
323 let collector: Arc<dyn crate::metrics::MetricsSink> = metrics.clone();
324 let (tx, rx) = unbounded_with_metrics(collector);
325 drop(rx);
326 assert_eq!(tx.send(1), Backpressure::Closed);
327 assert_eq!(metrics.counter("channel.unbounded.closed"), 1);
328 }
329}