1use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
2use std::sync::{Arc, Mutex};
3
4use super::{Backpressure, ChannelRecv, ChannelSend, ChannelStats, CloseBehavior, RecvOutcome};
5use crate::messages::Sequence;
6
7#[cfg(feature = "metrics")]
8use crate::metrics::MetricsSink;
9
10struct NewestInner<T> {
11 slot: Mutex<Option<(Sequence, Arc<T>)>>,
12 next_seq: AtomicU64,
13 closed: AtomicBool,
14 senders: AtomicUsize,
15 receivers: AtomicUsize,
16 enqueued: AtomicU64,
17 dropped: AtomicU64,
18 drained: AtomicU64,
19 close_behavior: CloseBehavior,
20 #[cfg(feature = "metrics")]
21 metrics: Option<Arc<dyn MetricsSink>>,
22}
23
24impl<T> NewestInner<T> {
25 fn new(close_behavior: CloseBehavior) -> Self {
26 Self {
27 slot: Mutex::new(None),
28 next_seq: AtomicU64::new(0),
29 closed: AtomicBool::new(false),
30 senders: AtomicUsize::new(1),
31 receivers: AtomicUsize::new(1),
32 enqueued: AtomicU64::new(0),
33 dropped: AtomicU64::new(0),
34 drained: AtomicU64::new(0),
35 close_behavior,
36 #[cfg(feature = "metrics")]
37 metrics: None,
38 }
39 }
40
41 #[cfg(feature = "metrics")]
42 fn new_with_metrics(close_behavior: CloseBehavior, metrics: Arc<dyn MetricsSink>) -> Self {
43 Self {
44 slot: Mutex::new(None),
45 next_seq: AtomicU64::new(0),
46 closed: AtomicBool::new(false),
47 senders: AtomicUsize::new(1),
48 receivers: AtomicUsize::new(1),
49 enqueued: AtomicU64::new(0),
50 dropped: AtomicU64::new(0),
51 drained: AtomicU64::new(0),
52 close_behavior,
53 metrics: Some(metrics),
54 }
55 }
56
57 fn mark_closed(&self) {
58 self.closed.store(true, Ordering::Release);
59 }
60
61 fn try_close(&self) {
62 match self.close_behavior {
63 CloseBehavior::FailFast => {
64 if self.senders.load(Ordering::Acquire) == 0
65 || self.receivers.load(Ordering::Acquire) == 0
66 {
67 self.mark_closed();
68 }
69 }
70 CloseBehavior::DrainUntilSendersDone => {
71 if self.senders.load(Ordering::Acquire) == 0 {
72 self.mark_closed();
73 }
74 }
75 }
76 }
77
78 #[cfg(feature = "metrics")]
79 fn inc(&self, key: &'static str) {
80 if let Some(metrics) = &self.metrics {
81 metrics.increment(key, 1);
82 }
83 }
84}
85
86pub struct NewestSender<T> {
87 inner: Arc<NewestInner<T>>,
88}
89
90impl<T> Clone for NewestSender<T> {
91 fn clone(&self) -> Self {
92 self.inner.senders.fetch_add(1, Ordering::Relaxed);
93 Self {
94 inner: Arc::clone(&self.inner),
95 }
96 }
97}
98
99impl<T> Drop for NewestSender<T> {
100 fn drop(&mut self) {
101 self.inner.senders.fetch_sub(1, Ordering::Relaxed);
102 self.inner.try_close();
103 }
104}
105
106impl<T> std::fmt::Debug for NewestSender<T> {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct("NewestSender").finish_non_exhaustive()
109 }
110}
111
112pub struct NewestReceiver<T> {
113 inner: Arc<NewestInner<T>>,
114 last_seen: Mutex<Option<Sequence>>,
115}
116
117impl<T> Clone for NewestReceiver<T> {
118 fn clone(&self) -> Self {
119 self.inner.receivers.fetch_add(1, Ordering::Relaxed);
120 Self {
121 inner: Arc::clone(&self.inner),
122 last_seen: Mutex::new(None),
123 }
124 }
125}
126
127impl<T> Drop for NewestReceiver<T> {
128 fn drop(&mut self) {
129 self.inner.receivers.fetch_sub(1, Ordering::Relaxed);
130 self.inner.try_close();
131 }
132}
133
134pub fn newest<T>() -> (NewestSender<T>, NewestReceiver<T>) {
135 let inner = Arc::new(NewestInner::new(CloseBehavior::FailFast));
136 (
137 NewestSender {
138 inner: Arc::clone(&inner),
139 },
140 NewestReceiver {
141 inner,
142 last_seen: Mutex::new(None),
143 },
144 )
145}
146
147#[cfg(feature = "metrics")]
148pub fn newest_with_metrics<T>(
149 metrics: Arc<dyn MetricsSink>,
150) -> (NewestSender<T>, NewestReceiver<T>) {
151 let inner = Arc::new(NewestInner::new_with_metrics(
152 CloseBehavior::FailFast,
153 metrics,
154 ));
155 (
156 NewestSender {
157 inner: Arc::clone(&inner),
158 },
159 NewestReceiver {
160 inner,
161 last_seen: Mutex::new(None),
162 },
163 )
164}
165
166pub fn newest_with_behavior<T>(
167 close_behavior: CloseBehavior,
168) -> (NewestSender<T>, NewestReceiver<T>) {
169 let inner = Arc::new(NewestInner::new(close_behavior));
170 (
171 NewestSender {
172 inner: Arc::clone(&inner),
173 },
174 NewestReceiver {
175 inner,
176 last_seen: Mutex::new(None),
177 },
178 )
179}
180
181#[cfg(feature = "metrics")]
182pub fn newest_with_metrics_and_behavior<T>(
183 close_behavior: CloseBehavior,
184 metrics: Arc<dyn MetricsSink>,
185) -> (NewestSender<T>, NewestReceiver<T>) {
186 let inner = Arc::new(NewestInner::new_with_metrics(close_behavior, metrics));
187 (
188 NewestSender {
189 inner: Arc::clone(&inner),
190 },
191 NewestReceiver {
192 inner,
193 last_seen: Mutex::new(None),
194 },
195 )
196}
197
198impl<T: Send + Sync> ChannelSend<Arc<T>> for NewestSender<T> {
199 fn send(&self, value: Arc<T>) -> Backpressure {
200 if self.inner.closed.load(Ordering::Acquire) {
201 #[cfg(feature = "metrics")]
202 self.inner.inc("channel.newest.closed");
203 return Backpressure::Closed;
204 }
205 let seq = Sequence::new(self.inner.next_seq.fetch_add(1, Ordering::Relaxed));
206 let mut guard = self.inner.slot.lock().expect("newest slot lock poisoned");
207 let dropped = guard.replace((seq, value)).is_some();
208 if dropped {
209 self.inner.dropped.fetch_add(1, Ordering::Relaxed);
210 #[cfg(feature = "metrics")]
211 self.inner.inc("channel.newest.dropped");
212 }
213 self.inner.enqueued.fetch_add(1, Ordering::Relaxed);
214 Backpressure::Ok
215 }
216}
217
218impl<T: Send + Sync> ChannelRecv<Arc<T>> for NewestReceiver<T> {
219 fn try_recv(&self) -> RecvOutcome<Arc<T>> {
220 let mut last_seen = self
221 .last_seen
222 .lock()
223 .expect("newest receiver lock poisoned");
224 let guard = self.inner.slot.lock().expect("newest slot lock poisoned");
225 let Some((seq, value)) = guard.as_ref() else {
226 return if self.inner.closed.load(Ordering::Acquire) {
227 RecvOutcome::Closed
228 } else {
229 RecvOutcome::Empty
230 };
231 };
232 if last_seen.map(|seen| seq <= &seen).unwrap_or(false) {
233 return if self.inner.closed.load(Ordering::Acquire) {
234 RecvOutcome::Closed
235 } else {
236 RecvOutcome::Empty
237 };
238 }
239 *last_seen = Some(*seq);
240 self.inner.drained.fetch_add(1, Ordering::Relaxed);
241 RecvOutcome::Data(Arc::clone(value))
242 }
243}
244
245impl<T> NewestReceiver<T> {
246 pub fn stats(&self) -> ChannelStats {
247 ChannelStats {
248 enqueued: self.inner.enqueued.load(Ordering::Relaxed),
249 dropped: self.inner.dropped.load(Ordering::Relaxed),
250 drained: self.inner.drained.load(Ordering::Relaxed),
251 depth: 0,
252 closed: self.inner.closed.load(Ordering::Relaxed),
253 }
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use proptest::prelude::*;
261
262 #[test]
263 fn newest_overwrites() {
264 let (tx, rx) = newest::<u64>();
265 tx.send(Arc::new(1));
266 tx.send(Arc::new(2));
267 assert_eq!(rx.try_recv(), RecvOutcome::Data(Arc::new(2)));
268 assert!(matches!(
269 rx.try_recv(),
270 RecvOutcome::Empty | RecvOutcome::Closed
271 ));
272 }
273
274 #[test]
275 fn newest_closed_when_senders_drop() {
276 let (tx, rx) = newest::<u64>();
277 drop(tx);
278 assert!(matches!(
279 rx.try_recv(),
280 RecvOutcome::Closed | RecvOutcome::Empty
281 ));
282 }
283
284 proptest! {
285 #[test]
286 fn newest_returns_monotonic_sequences(values in proptest::collection::vec(any::<u32>(), 1..50)) {
287 let (tx, rx) = newest::<u32>();
288 for v in &values {
289 let _ = tx.send(Arc::new(*v));
290 }
291 let mut seen = Vec::new();
292 while let RecvOutcome::Data(v) = rx.try_recv() {
293 seen.push(*v);
294 }
295 if let Some(last) = values.last() {
297 prop_assert!(seen.is_empty() || seen == vec![*last]);
298 } else {
299 prop_assert!(seen.is_empty());
300 }
301 }
302 }
303}
304
305#[cfg(all(test, feature = "metrics"))]
306mod metric_tests {
307 use super::*;
308 use crate::metrics::InMemoryMetrics;
309 use std::sync::Arc;
310
311 #[test]
312 fn metrics_record_dropped_and_closed() {
313 let metrics = Arc::new(InMemoryMetrics::default());
314 let collector: Arc<dyn crate::metrics::MetricsSink> = metrics.clone();
315 let (tx, rx) = newest_with_metrics::<u64>(collector);
316 tx.send(Arc::new(1));
317 tx.send(Arc::new(2));
318 assert_eq!(metrics.counter("channel.newest.dropped"), 1);
319 drop(rx);
320 assert_eq!(tx.send(Arc::new(3)), Backpressure::Closed);
321 assert_eq!(metrics.counter("channel.newest.closed"), 1);
322 }
323}