soil_client/utils/
mpsc.rs1pub use async_channel::{TryRecvError, TrySendError};
10
11use crate::utils::metrics::{
12 DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, UNBOUNDED_CHANNELS_SIZE,
13};
14use async_channel::{Receiver, Sender};
15use futures::{
16 stream::{FusedStream, Stream},
17 task::{Context, Poll},
18};
19use log::error;
20use std::{
21 backtrace::Backtrace,
22 pin::Pin,
23 sync::{
24 atomic::{AtomicBool, Ordering},
25 Arc,
26 },
27};
28use subsoil::arithmetic::traits::SaturatedConversion;
29
30#[derive(Debug)]
33pub struct TracingUnboundedSender<T> {
34 inner: Sender<T>,
35 name: &'static str,
36 queue_size_warning: usize,
37 warning_fired: Arc<AtomicBool>,
38 creation_backtrace: Arc<Backtrace>,
39}
40
41impl<T> Clone for TracingUnboundedSender<T> {
43 fn clone(&self) -> Self {
44 Self {
45 inner: self.inner.clone(),
46 name: self.name,
47 queue_size_warning: self.queue_size_warning,
48 warning_fired: self.warning_fired.clone(),
49 creation_backtrace: self.creation_backtrace.clone(),
50 }
51 }
52}
53
54#[derive(Debug)]
57pub struct TracingUnboundedReceiver<T> {
58 inner: Receiver<T>,
59 name: &'static str,
60}
61
62pub fn tracing_unbounded<T>(
66 name: &'static str,
67 queue_size_warning: usize,
68) -> (TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
69 let (s, r) = async_channel::unbounded();
70 let sender = TracingUnboundedSender {
71 inner: s,
72 name,
73 queue_size_warning,
74 warning_fired: Arc::new(AtomicBool::new(false)),
75 creation_backtrace: Arc::new(Backtrace::force_capture()),
76 };
77 let receiver = TracingUnboundedReceiver { inner: r, name: name.into() };
78 (sender, receiver)
79}
80
81impl<T> TracingUnboundedSender<T> {
82 pub fn is_closed(&self) -> bool {
84 self.inner.is_closed()
85 }
86
87 pub fn close(&self) -> bool {
89 self.inner.close()
90 }
91
92 pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
94 self.inner.try_send(msg).inspect(|_| {
95 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, SENT_LABEL]).inc();
96 UNBOUNDED_CHANNELS_SIZE
97 .with_label_values(&[self.name])
98 .set(self.inner.len().saturated_into());
99
100 if self.inner.len() >= self.queue_size_warning
101 && self
102 .warning_fired
103 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
104 .is_ok()
105 {
106 error!(
107 "The number of unprocessed messages in channel `{}` exceeded {}.\n\
108 The channel was created at:\n{}\n
109 Last message was sent from:\n{}",
110 self.name,
111 self.queue_size_warning,
112 self.creation_backtrace,
113 Backtrace::force_capture(),
114 );
115 }
116 })
117 }
118
119 pub fn len(&self) -> usize {
121 self.inner.len()
122 }
123}
124
125impl<T> TracingUnboundedReceiver<T> {
126 pub fn close(&mut self) -> bool {
128 self.inner.close()
129 }
130
131 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
134 self.inner.try_recv().inspect(|_| {
135 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, RECEIVED_LABEL]).inc();
136 UNBOUNDED_CHANNELS_SIZE
137 .with_label_values(&[self.name])
138 .set(self.inner.len().saturated_into());
139 })
140 }
141
142 pub fn len(&self) -> usize {
144 self.inner.len()
145 }
146
147 pub fn name(&self) -> &'static str {
149 self.name
150 }
151}
152
153impl<T> Drop for TracingUnboundedReceiver<T> {
154 fn drop(&mut self) {
155 self.close();
157 let count = self.inner.len();
159 if count > 0 {
161 UNBOUNDED_CHANNELS_COUNTER
162 .with_label_values(&[self.name, DROPPED_LABEL])
163 .inc_by(count.saturated_into());
164 }
165 UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(0);
167 while let Ok(_) = self.inner.try_recv() {}
171 }
172}
173
174impl<T> Unpin for TracingUnboundedReceiver<T> {}
175
176impl<T> Stream for TracingUnboundedReceiver<T> {
177 type Item = T;
178
179 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
180 let s = self.get_mut();
181 match Pin::new(&mut s.inner).poll_next(cx) {
182 Poll::Ready(msg) => {
183 if msg.is_some() {
184 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, RECEIVED_LABEL]).inc();
185 UNBOUNDED_CHANNELS_SIZE
186 .with_label_values(&[s.name])
187 .set(s.inner.len().saturated_into());
188 }
189 Poll::Ready(msg)
190 },
191 Poll::Pending => Poll::Pending,
192 }
193 }
194}
195
196impl<T> FusedStream for TracingUnboundedReceiver<T> {
197 fn is_terminated(&self) -> bool {
198 self.inner.is_terminated()
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::tracing_unbounded;
205 use async_channel::{self, RecvError, TryRecvError};
206
207 #[test]
208 fn test_tracing_unbounded_receiver_drop() {
209 let (tracing_unbounded_sender, tracing_unbounded_receiver) =
210 tracing_unbounded("test-receiver-drop", 10);
211 let (tx, rx) = async_channel::unbounded::<usize>();
212
213 tracing_unbounded_sender.unbounded_send(tx).unwrap();
214 drop(tracing_unbounded_receiver);
215
216 assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
217 assert_eq!(rx.recv_blocking(), Err(RecvError));
218 }
219}