Skip to main content

soil_client/utils/
mpsc.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: Apache-2.0 OR GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Code to meter unbounded channels.
8
9pub use async_channel::{TryRecvError, TrySendError};
10
11use crate::utils::metrics::{
12	DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, UNBOUNDED_CHANNELS_SIZE,
13};
14use async_channel::{Receiver, Sender};
15use futures::{
16	stream::{FusedStream, Stream},
17	task::{Context, Poll},
18};
19use log::error;
20use std::{
21	backtrace::Backtrace,
22	pin::Pin,
23	sync::{
24		atomic::{AtomicBool, Ordering},
25		Arc,
26	},
27};
28use subsoil::arithmetic::traits::SaturatedConversion;
29
30/// Wrapper Type around [`async_channel::Sender`] that increases the global
31/// measure when a message is added.
32#[derive(Debug)]
33pub struct TracingUnboundedSender<T> {
34	inner: Sender<T>,
35	name: &'static str,
36	queue_size_warning: usize,
37	warning_fired: Arc<AtomicBool>,
38	creation_backtrace: Arc<Backtrace>,
39}
40
41// Strangely, deriving `Clone` requires that `T` is also `Clone`.
42impl<T> Clone for TracingUnboundedSender<T> {
43	fn clone(&self) -> Self {
44		Self {
45			inner: self.inner.clone(),
46			name: self.name,
47			queue_size_warning: self.queue_size_warning,
48			warning_fired: self.warning_fired.clone(),
49			creation_backtrace: self.creation_backtrace.clone(),
50		}
51	}
52}
53
54/// Wrapper Type around [`async_channel::Receiver`] that decreases the global
55/// measure when a message is polled.
56#[derive(Debug)]
57pub struct TracingUnboundedReceiver<T> {
58	inner: Receiver<T>,
59	name: &'static str,
60}
61
62/// Wrapper around [`async_channel::unbounded`] that tracks the in- and outflow via
63/// `UNBOUNDED_CHANNELS_COUNTER` and warns if the message queue grows
64/// above the warning threshold.
65pub fn tracing_unbounded<T>(
66	name: &'static str,
67	queue_size_warning: usize,
68) -> (TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
69	let (s, r) = async_channel::unbounded();
70	let sender = TracingUnboundedSender {
71		inner: s,
72		name,
73		queue_size_warning,
74		warning_fired: Arc::new(AtomicBool::new(false)),
75		creation_backtrace: Arc::new(Backtrace::force_capture()),
76	};
77	let receiver = TracingUnboundedReceiver { inner: r, name: name.into() };
78	(sender, receiver)
79}
80
81impl<T> TracingUnboundedSender<T> {
82	/// Proxy function to [`async_channel::Sender`].
83	pub fn is_closed(&self) -> bool {
84		self.inner.is_closed()
85	}
86
87	/// Proxy function to [`async_channel::Sender`].
88	pub fn close(&self) -> bool {
89		self.inner.close()
90	}
91
92	/// Proxy function to `async_channel::Sender::try_send`.
93	pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
94		self.inner.try_send(msg).inspect(|_| {
95			UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, SENT_LABEL]).inc();
96			UNBOUNDED_CHANNELS_SIZE
97				.with_label_values(&[self.name])
98				.set(self.inner.len().saturated_into());
99
100			if self.inner.len() >= self.queue_size_warning
101				&& self
102					.warning_fired
103					.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
104					.is_ok()
105			{
106				error!(
107					"The number of unprocessed messages in channel `{}` exceeded {}.\n\
108					 The channel was created at:\n{}\n
109					 Last message was sent from:\n{}",
110					self.name,
111					self.queue_size_warning,
112					self.creation_backtrace,
113					Backtrace::force_capture(),
114				);
115			}
116		})
117	}
118
119	/// The number of elements in the channel (proxy function to [`async_channel::Sender`]).
120	pub fn len(&self) -> usize {
121		self.inner.len()
122	}
123}
124
125impl<T> TracingUnboundedReceiver<T> {
126	/// Proxy function to [`async_channel::Receiver`].
127	pub fn close(&mut self) -> bool {
128		self.inner.close()
129	}
130
131	/// Proxy function to [`async_channel::Receiver`]
132	/// that discounts the messages taken out.
133	pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
134		self.inner.try_recv().inspect(|_| {
135			UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, RECEIVED_LABEL]).inc();
136			UNBOUNDED_CHANNELS_SIZE
137				.with_label_values(&[self.name])
138				.set(self.inner.len().saturated_into());
139		})
140	}
141
142	/// The number of elements in the channel (proxy function to [`async_channel::Receiver`]).
143	pub fn len(&self) -> usize {
144		self.inner.len()
145	}
146
147	/// The name of this receiver
148	pub fn name(&self) -> &'static str {
149		self.name
150	}
151}
152
153impl<T> Drop for TracingUnboundedReceiver<T> {
154	fn drop(&mut self) {
155		// Close the channel to prevent any further messages to be sent into the channel
156		self.close();
157		// The number of messages about to be dropped
158		let count = self.inner.len();
159		// Discount the messages
160		if count > 0 {
161			UNBOUNDED_CHANNELS_COUNTER
162				.with_label_values(&[self.name, DROPPED_LABEL])
163				.inc_by(count.saturated_into());
164		}
165		// Reset the size metric to 0
166		UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(0);
167		// Drain all the pending messages in the channel since they can never be accessed,
168		// this can be removed once https://github.com/smol-rs/async-channel/issues/23 is
169		// resolved
170		while let Ok(_) = self.inner.try_recv() {}
171	}
172}
173
174impl<T> Unpin for TracingUnboundedReceiver<T> {}
175
176impl<T> Stream for TracingUnboundedReceiver<T> {
177	type Item = T;
178
179	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
180		let s = self.get_mut();
181		match Pin::new(&mut s.inner).poll_next(cx) {
182			Poll::Ready(msg) => {
183				if msg.is_some() {
184					UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, RECEIVED_LABEL]).inc();
185					UNBOUNDED_CHANNELS_SIZE
186						.with_label_values(&[s.name])
187						.set(s.inner.len().saturated_into());
188				}
189				Poll::Ready(msg)
190			},
191			Poll::Pending => Poll::Pending,
192		}
193	}
194}
195
196impl<T> FusedStream for TracingUnboundedReceiver<T> {
197	fn is_terminated(&self) -> bool {
198		self.inner.is_terminated()
199	}
200}
201
202#[cfg(test)]
203mod tests {
204	use super::tracing_unbounded;
205	use async_channel::{self, RecvError, TryRecvError};
206
207	#[test]
208	fn test_tracing_unbounded_receiver_drop() {
209		let (tracing_unbounded_sender, tracing_unbounded_receiver) =
210			tracing_unbounded("test-receiver-drop", 10);
211		let (tx, rx) = async_channel::unbounded::<usize>();
212
213		tracing_unbounded_sender.unbounded_send(tx).unwrap();
214		drop(tracing_unbounded_receiver);
215
216		assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
217		assert_eq!(rx.recv_blocking(), Err(RecvError));
218	}
219}