daedalus_core/channels/
newest.rs

1use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
2use std::sync::{Arc, Mutex};
3
4use super::{Backpressure, ChannelRecv, ChannelSend, ChannelStats, CloseBehavior, RecvOutcome};
5use crate::messages::Sequence;
6
7#[cfg(feature = "metrics")]
8use crate::metrics::MetricsSink;
9
10struct NewestInner<T> {
11    slot: Mutex<Option<(Sequence, Arc<T>)>>,
12    next_seq: AtomicU64,
13    closed: AtomicBool,
14    senders: AtomicUsize,
15    receivers: AtomicUsize,
16    enqueued: AtomicU64,
17    dropped: AtomicU64,
18    drained: AtomicU64,
19    close_behavior: CloseBehavior,
20    #[cfg(feature = "metrics")]
21    metrics: Option<Arc<dyn MetricsSink>>,
22}
23
24impl<T> NewestInner<T> {
25    fn new(close_behavior: CloseBehavior) -> Self {
26        Self {
27            slot: Mutex::new(None),
28            next_seq: AtomicU64::new(0),
29            closed: AtomicBool::new(false),
30            senders: AtomicUsize::new(1),
31            receivers: AtomicUsize::new(1),
32            enqueued: AtomicU64::new(0),
33            dropped: AtomicU64::new(0),
34            drained: AtomicU64::new(0),
35            close_behavior,
36            #[cfg(feature = "metrics")]
37            metrics: None,
38        }
39    }
40
41    #[cfg(feature = "metrics")]
42    fn new_with_metrics(close_behavior: CloseBehavior, metrics: Arc<dyn MetricsSink>) -> Self {
43        Self {
44            slot: Mutex::new(None),
45            next_seq: AtomicU64::new(0),
46            closed: AtomicBool::new(false),
47            senders: AtomicUsize::new(1),
48            receivers: AtomicUsize::new(1),
49            enqueued: AtomicU64::new(0),
50            dropped: AtomicU64::new(0),
51            drained: AtomicU64::new(0),
52            close_behavior,
53            metrics: Some(metrics),
54        }
55    }
56
57    fn mark_closed(&self) {
58        self.closed.store(true, Ordering::Release);
59    }
60
61    fn try_close(&self) {
62        match self.close_behavior {
63            CloseBehavior::FailFast => {
64                if self.senders.load(Ordering::Acquire) == 0
65                    || self.receivers.load(Ordering::Acquire) == 0
66                {
67                    self.mark_closed();
68                }
69            }
70            CloseBehavior::DrainUntilSendersDone => {
71                if self.senders.load(Ordering::Acquire) == 0 {
72                    self.mark_closed();
73                }
74            }
75        }
76    }
77
78    #[cfg(feature = "metrics")]
79    fn inc(&self, key: &'static str) {
80        if let Some(metrics) = &self.metrics {
81            metrics.increment(key, 1);
82        }
83    }
84}
85
86pub struct NewestSender<T> {
87    inner: Arc<NewestInner<T>>,
88}
89
90impl<T> Clone for NewestSender<T> {
91    fn clone(&self) -> Self {
92        self.inner.senders.fetch_add(1, Ordering::Relaxed);
93        Self {
94            inner: Arc::clone(&self.inner),
95        }
96    }
97}
98
99impl<T> Drop for NewestSender<T> {
100    fn drop(&mut self) {
101        self.inner.senders.fetch_sub(1, Ordering::Relaxed);
102        self.inner.try_close();
103    }
104}
105
106impl<T> std::fmt::Debug for NewestSender<T> {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct("NewestSender").finish_non_exhaustive()
109    }
110}
111
112pub struct NewestReceiver<T> {
113    inner: Arc<NewestInner<T>>,
114    last_seen: Mutex<Option<Sequence>>,
115}
116
117impl<T> Clone for NewestReceiver<T> {
118    fn clone(&self) -> Self {
119        self.inner.receivers.fetch_add(1, Ordering::Relaxed);
120        Self {
121            inner: Arc::clone(&self.inner),
122            last_seen: Mutex::new(None),
123        }
124    }
125}
126
127impl<T> Drop for NewestReceiver<T> {
128    fn drop(&mut self) {
129        self.inner.receivers.fetch_sub(1, Ordering::Relaxed);
130        self.inner.try_close();
131    }
132}
133
134pub fn newest<T>() -> (NewestSender<T>, NewestReceiver<T>) {
135    let inner = Arc::new(NewestInner::new(CloseBehavior::FailFast));
136    (
137        NewestSender {
138            inner: Arc::clone(&inner),
139        },
140        NewestReceiver {
141            inner,
142            last_seen: Mutex::new(None),
143        },
144    )
145}
146
147#[cfg(feature = "metrics")]
148pub fn newest_with_metrics<T>(
149    metrics: Arc<dyn MetricsSink>,
150) -> (NewestSender<T>, NewestReceiver<T>) {
151    let inner = Arc::new(NewestInner::new_with_metrics(
152        CloseBehavior::FailFast,
153        metrics,
154    ));
155    (
156        NewestSender {
157            inner: Arc::clone(&inner),
158        },
159        NewestReceiver {
160            inner,
161            last_seen: Mutex::new(None),
162        },
163    )
164}
165
166pub fn newest_with_behavior<T>(
167    close_behavior: CloseBehavior,
168) -> (NewestSender<T>, NewestReceiver<T>) {
169    let inner = Arc::new(NewestInner::new(close_behavior));
170    (
171        NewestSender {
172            inner: Arc::clone(&inner),
173        },
174        NewestReceiver {
175            inner,
176            last_seen: Mutex::new(None),
177        },
178    )
179}
180
181#[cfg(feature = "metrics")]
182pub fn newest_with_metrics_and_behavior<T>(
183    close_behavior: CloseBehavior,
184    metrics: Arc<dyn MetricsSink>,
185) -> (NewestSender<T>, NewestReceiver<T>) {
186    let inner = Arc::new(NewestInner::new_with_metrics(close_behavior, metrics));
187    (
188        NewestSender {
189            inner: Arc::clone(&inner),
190        },
191        NewestReceiver {
192            inner,
193            last_seen: Mutex::new(None),
194        },
195    )
196}
197
198impl<T: Send + Sync> ChannelSend<Arc<T>> for NewestSender<T> {
199    fn send(&self, value: Arc<T>) -> Backpressure {
200        if self.inner.closed.load(Ordering::Acquire) {
201            #[cfg(feature = "metrics")]
202            self.inner.inc("channel.newest.closed");
203            return Backpressure::Closed;
204        }
205        let seq = Sequence::new(self.inner.next_seq.fetch_add(1, Ordering::Relaxed));
206        let mut guard = self.inner.slot.lock().expect("newest slot lock poisoned");
207        let dropped = guard.replace((seq, value)).is_some();
208        if dropped {
209            self.inner.dropped.fetch_add(1, Ordering::Relaxed);
210            #[cfg(feature = "metrics")]
211            self.inner.inc("channel.newest.dropped");
212        }
213        self.inner.enqueued.fetch_add(1, Ordering::Relaxed);
214        Backpressure::Ok
215    }
216}
217
218impl<T: Send + Sync> ChannelRecv<Arc<T>> for NewestReceiver<T> {
219    fn try_recv(&self) -> RecvOutcome<Arc<T>> {
220        let mut last_seen = self
221            .last_seen
222            .lock()
223            .expect("newest receiver lock poisoned");
224        let guard = self.inner.slot.lock().expect("newest slot lock poisoned");
225        let Some((seq, value)) = guard.as_ref() else {
226            return if self.inner.closed.load(Ordering::Acquire) {
227                RecvOutcome::Closed
228            } else {
229                RecvOutcome::Empty
230            };
231        };
232        if last_seen.map(|seen| seq <= &seen).unwrap_or(false) {
233            return if self.inner.closed.load(Ordering::Acquire) {
234                RecvOutcome::Closed
235            } else {
236                RecvOutcome::Empty
237            };
238        }
239        *last_seen = Some(*seq);
240        self.inner.drained.fetch_add(1, Ordering::Relaxed);
241        RecvOutcome::Data(Arc::clone(value))
242    }
243}
244
245impl<T> NewestReceiver<T> {
246    pub fn stats(&self) -> ChannelStats {
247        ChannelStats {
248            enqueued: self.inner.enqueued.load(Ordering::Relaxed),
249            dropped: self.inner.dropped.load(Ordering::Relaxed),
250            drained: self.inner.drained.load(Ordering::Relaxed),
251            depth: 0,
252            closed: self.inner.closed.load(Ordering::Relaxed),
253        }
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use proptest::prelude::*;
261
262    #[test]
263    fn newest_overwrites() {
264        let (tx, rx) = newest::<u64>();
265        tx.send(Arc::new(1));
266        tx.send(Arc::new(2));
267        assert_eq!(rx.try_recv(), RecvOutcome::Data(Arc::new(2)));
268        assert!(matches!(
269            rx.try_recv(),
270            RecvOutcome::Empty | RecvOutcome::Closed
271        ));
272    }
273
274    #[test]
275    fn newest_closed_when_senders_drop() {
276        let (tx, rx) = newest::<u64>();
277        drop(tx);
278        assert!(matches!(
279            rx.try_recv(),
280            RecvOutcome::Closed | RecvOutcome::Empty
281        ));
282    }
283
284    proptest! {
285        #[test]
286        fn newest_returns_monotonic_sequences(values in proptest::collection::vec(any::<u32>(), 1..50)) {
287            let (tx, rx) = newest::<u32>();
288            for v in &values {
289                let _ = tx.send(Arc::new(*v));
290            }
291            let mut seen = Vec::new();
292            while let RecvOutcome::Data(v) = rx.try_recv() {
293                seen.push(*v);
294            }
295            // At most one value, and if present it's the last written.
296            if let Some(last) = values.last() {
297                prop_assert!(seen.is_empty() || seen == vec![*last]);
298            } else {
299                prop_assert!(seen.is_empty());
300            }
301        }
302    }
303}
304
305#[cfg(all(test, feature = "metrics"))]
306mod metric_tests {
307    use super::*;
308    use crate::metrics::InMemoryMetrics;
309    use std::sync::Arc;
310
311    #[test]
312    fn metrics_record_dropped_and_closed() {
313        let metrics = Arc::new(InMemoryMetrics::default());
314        let collector: Arc<dyn crate::metrics::MetricsSink> = metrics.clone();
315        let (tx, rx) = newest_with_metrics::<u64>(collector);
316        tx.send(Arc::new(1));
317        tx.send(Arc::new(2));
318        assert_eq!(metrics.counter("channel.newest.dropped"), 1);
319        drop(rx);
320        assert_eq!(tx.send(Arc::new(3)), Backpressure::Closed);
321        assert_eq!(metrics.counter("channel.newest.closed"), 1);
322    }
323}