1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
use std::time::Duration; use crate::channel::{Receiver, RecvError, RecvTimeoutError, TryRecvError, CHANNEL_SIZE_PREFIX}; use crate::metrics::metric::{Counter, Gauge}; #[derive(Clone, Debug)] pub struct ChannelReceiver<T> where T: Clone, { name: String, guava_size_name: String, pub(crate) receiver: Receiver<T>, size: Gauge, drain_counter: Counter, } impl<T> ChannelReceiver<T> where T: Clone, { pub fn new(name: &str, receiver: Receiver<T>, size: Gauge, drain_counter: Counter) -> Self { ChannelReceiver { name: name.to_string(), guava_size_name: CHANNEL_SIZE_PREFIX.to_owned() + name, receiver, size, drain_counter, } } #[inline] fn on_success(&self) { self.size.fetch_sub(1 as i64); self.drain_counter.fetch_add(1 as u64); } pub fn try_recv(&self) -> Result<T, TryRecvError> { self.receiver.try_recv().map(|event| { self.on_success(); event }) } pub fn recv(&self) -> Result<T, RecvError> { self.receiver.recv().map(|event| { self.on_success(); event }) } pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { self.receiver.recv_timeout(timeout).map(|event| { self.on_success(); event }) } }