daedalus_core/channels/
unbounded.rs

1use crossbeam_queue::SegQueue;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
4
5use super::{Backpressure, ChannelRecv, ChannelSend, ChannelStats, CloseBehavior, RecvOutcome};
6
7#[cfg(feature = "metrics")]
8use crate::metrics::MetricsSink;
9
10struct UnboundedInner<T> {
11    queue: SegQueue<T>,
12    closed: AtomicBool,
13    senders: AtomicUsize,
14    receivers: AtomicUsize,
15    enqueued: AtomicU64,
16    dropped: AtomicU64,
17    drained: AtomicU64,
18    depth: AtomicUsize,
19    close_behavior: CloseBehavior,
20    #[cfg(feature = "metrics")]
21    metrics: Option<Arc<dyn MetricsSink>>,
22}
23
24impl<T> UnboundedInner<T> {
25    fn new(close_behavior: CloseBehavior) -> Self {
26        Self {
27            queue: SegQueue::new(),
28            closed: AtomicBool::new(false),
29            senders: AtomicUsize::new(1),
30            receivers: AtomicUsize::new(1),
31            enqueued: AtomicU64::new(0),
32            dropped: AtomicU64::new(0),
33            drained: AtomicU64::new(0),
34            depth: AtomicUsize::new(0),
35            close_behavior,
36            #[cfg(feature = "metrics")]
37            metrics: None,
38        }
39    }
40
41    #[cfg(feature = "metrics")]
42    fn new_with_metrics(close_behavior: CloseBehavior, metrics: Arc<dyn MetricsSink>) -> Self {
43        Self {
44            queue: SegQueue::new(),
45            closed: AtomicBool::new(false),
46            senders: AtomicUsize::new(1),
47            receivers: AtomicUsize::new(1),
48            enqueued: AtomicU64::new(0),
49            dropped: AtomicU64::new(0),
50            drained: AtomicU64::new(0),
51            depth: AtomicUsize::new(0),
52            close_behavior,
53            metrics: Some(metrics),
54        }
55    }
56
57    fn mark_closed(&self) {
58        self.closed.store(true, Ordering::Release);
59    }
60
61    fn try_close(&self) {
62        match self.close_behavior {
63            CloseBehavior::FailFast => {
64                if self.senders.load(Ordering::Acquire) == 0
65                    || self.receivers.load(Ordering::Acquire) == 0
66                {
67                    self.mark_closed();
68                }
69            }
70            CloseBehavior::DrainUntilSendersDone => {
71                if self.senders.load(Ordering::Acquire) == 0 {
72                    self.mark_closed();
73                }
74            }
75        }
76    }
77
78    #[cfg(feature = "metrics")]
79    fn inc(&self, key: &'static str) {
80        if let Some(metrics) = &self.metrics {
81            metrics.increment(key, 1);
82        }
83    }
84}
85
86pub struct UnboundedSender<T> {
87    inner: Arc<UnboundedInner<T>>,
88}
89
90impl<T> Clone for UnboundedSender<T> {
91    fn clone(&self) -> Self {
92        self.inner.senders.fetch_add(1, Ordering::Relaxed);
93        Self {
94            inner: Arc::clone(&self.inner),
95        }
96    }
97}
98
99impl<T> Drop for UnboundedSender<T> {
100    fn drop(&mut self) {
101        self.inner.senders.fetch_sub(1, Ordering::Relaxed);
102        self.inner.try_close();
103    }
104}
105
106impl<T> std::fmt::Debug for UnboundedSender<T> {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct("UnboundedSender").finish_non_exhaustive()
109    }
110}
111
112pub struct UnboundedReceiver<T> {
113    inner: Arc<UnboundedInner<T>>,
114}
115
116impl<T> Clone for UnboundedReceiver<T> {
117    fn clone(&self) -> Self {
118        self.inner.receivers.fetch_add(1, Ordering::Relaxed);
119        Self {
120            inner: Arc::clone(&self.inner),
121        }
122    }
123}
124
125impl<T> Drop for UnboundedReceiver<T> {
126    fn drop(&mut self) {
127        self.inner.receivers.fetch_sub(1, Ordering::Relaxed);
128        self.inner.try_close();
129    }
130}
131
132pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
133    let inner = Arc::new(UnboundedInner::new(CloseBehavior::FailFast));
134    (
135        UnboundedSender {
136            inner: Arc::clone(&inner),
137        },
138        UnboundedReceiver { inner },
139    )
140}
141
142pub fn unbounded_with_behavior<T>(
143    close_behavior: CloseBehavior,
144) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
145    let inner = Arc::new(UnboundedInner::new(close_behavior));
146    (
147        UnboundedSender {
148            inner: Arc::clone(&inner),
149        },
150        UnboundedReceiver { inner },
151    )
152}
153
154#[cfg(feature = "metrics")]
155pub fn unbounded_with_metrics<T>(
156    metrics: Arc<dyn MetricsSink>,
157) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
158    let inner = Arc::new(UnboundedInner::new_with_metrics(
159        CloseBehavior::FailFast,
160        metrics,
161    ));
162    (
163        UnboundedSender {
164            inner: Arc::clone(&inner),
165        },
166        UnboundedReceiver { inner },
167    )
168}
169
170#[cfg(feature = "metrics")]
171pub fn unbounded_with_metrics_and_behavior<T>(
172    close_behavior: CloseBehavior,
173    metrics: Arc<dyn MetricsSink>,
174) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
175    let inner = Arc::new(UnboundedInner::new_with_metrics(close_behavior, metrics));
176    (
177        UnboundedSender {
178            inner: Arc::clone(&inner),
179        },
180        UnboundedReceiver { inner },
181    )
182}
183
184impl<T: Send> ChannelSend<T> for UnboundedSender<T> {
185    fn send(&self, value: T) -> Backpressure {
186        if self.inner.closed.load(Ordering::Acquire) {
187            #[cfg(feature = "metrics")]
188            self.inner.inc("channel.unbounded.closed");
189            return Backpressure::Closed;
190        }
191        self.inner.queue.push(value);
192        self.inner.enqueued.fetch_add(1, Ordering::Relaxed);
193        self.inner.depth.fetch_add(1, Ordering::Relaxed);
194        Backpressure::Ok
195    }
196}
197
198impl<T: Send> ChannelRecv<T> for UnboundedReceiver<T> {
199    fn try_recv(&self) -> RecvOutcome<T> {
200        match self.inner.queue.pop() {
201            Some(v) => {
202                self.inner.drained.fetch_add(1, Ordering::Relaxed);
203                self.inner.depth.fetch_sub(1, Ordering::Relaxed);
204                RecvOutcome::Data(v)
205            }
206            None if self.inner.closed.load(Ordering::Acquire) => RecvOutcome::Closed,
207            None => RecvOutcome::Empty,
208        }
209    }
210}
211
212impl<T> UnboundedReceiver<T> {
213    pub fn stats(&self) -> ChannelStats {
214        ChannelStats {
215            enqueued: self.inner.enqueued.load(Ordering::Relaxed),
216            dropped: self.inner.dropped.load(Ordering::Relaxed),
217            drained: self.inner.drained.load(Ordering::Relaxed),
218            depth: self.inner.depth.load(Ordering::Relaxed),
219            closed: self.inner.closed.load(Ordering::Relaxed),
220        }
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use proptest::prelude::*;
228    use std::sync::Arc;
229    use std::sync::atomic::{AtomicUsize, Ordering};
230    use std::thread;
231
232    #[test]
233    fn unbounded_send_recv() {
234        let (tx, rx) = unbounded();
235        assert_eq!(tx.send(7), Backpressure::Ok);
236        assert_eq!(rx.try_recv(), RecvOutcome::Data(7));
237        assert_eq!(rx.try_recv(), RecvOutcome::Empty);
238    }
239
240    #[test]
241    fn unbounded_closed_when_senders_drop() {
242        let (tx, rx) = unbounded::<u64>();
243        drop(tx);
244        assert_eq!(rx.try_recv(), RecvOutcome::Closed);
245    }
246
247    proptest! {
248        #[test]
249        fn unbounded_preserves_order(input in proptest::collection::vec(any::<u32>(), 1..50)) {
250            let (tx, rx) = unbounded();
251            for v in &input {
252                let _ = tx.send(*v);
253            }
254            let mut drained = Vec::new();
255            while let RecvOutcome::Data(v) = rx.try_recv() {
256                drained.push(v);
257            }
258            prop_assert_eq!(drained, input);
259        }
260    }
261
262    #[test]
263    fn unbounded_mpmc_stress() {
264        let (tx, rx) = unbounded();
265        let tx = Arc::new(tx);
266        let rx = Arc::new(rx);
267        let produced = 4usize * 50usize;
268        let received = Arc::new(AtomicUsize::new(0));
269
270        let mut handles = Vec::new();
271        for _ in 0..4 {
272            let txc = tx.clone();
273            handles.push(thread::spawn(move || {
274                for i in 0..50u32 {
275                    let _ = txc.send(i);
276                }
277            }));
278        }
279
280        let mut recv_handles = Vec::new();
281        for _ in 0..2 {
282            let rxc = rx.clone();
283            let recv_count = received.clone();
284            recv_handles.push(thread::spawn(move || {
285                loop {
286                    match rxc.try_recv() {
287                        RecvOutcome::Data(_) => {
288                            recv_count.fetch_add(1, Ordering::Relaxed);
289                        }
290                        RecvOutcome::Empty => {
291                            if recv_count.load(Ordering::Relaxed) >= produced {
292                                break;
293                            }
294                            std::thread::yield_now();
295                        }
296                        RecvOutcome::Closed => break,
297                    }
298                }
299            }));
300        }
301
302        for h in handles {
303            h.join().unwrap();
304        }
305        drop(tx);
306
307        for h in recv_handles {
308            h.join().unwrap();
309        }
310        assert_eq!(received.load(Ordering::Relaxed), produced);
311    }
312}
313
314#[cfg(all(test, feature = "metrics"))]
315mod metric_tests {
316    use super::*;
317    use crate::metrics::InMemoryMetrics;
318    use std::sync::Arc;
319
320    #[test]
321    fn metrics_record_closed() {
322        let metrics = Arc::new(InMemoryMetrics::default());
323        let collector: Arc<dyn crate::metrics::MetricsSink> = metrics.clone();
324        let (tx, rx) = unbounded_with_metrics(collector);
325        drop(rx);
326        assert_eq!(tx.send(1), Backpressure::Closed);
327        assert_eq!(metrics.counter("channel.unbounded.closed"), 1);
328    }
329}