measured_future_rs/report_sinks/
mpsc_unbounded.rs

1use std::fmt;
2
3use ::futures::channel::mpsc;
4
5use crate::report_sink::BoxedReportSink;
6use crate::report_sink::ReportSink;
7
8pub struct MpscUnboundedSink<R>(mpsc::UnboundedSender<R>);
9
10impl<R> MpscUnboundedSink<R>
11where
12    R: Send + Sync + 'static,
13{
14    pub fn create() -> (Self, mpsc::UnboundedReceiver<R>) {
15        let (tx, rx) = mpsc::unbounded();
16        (Self(tx), rx)
17    }
18    pub fn from_tx(tx: mpsc::UnboundedSender<R>) -> Self {
19        Self(tx)
20    }
21}
22
23impl<R> Clone for MpscUnboundedSink<R> {
24    fn clone(&self) -> Self {
25        Self(self.0.clone())
26    }
27}
28
29impl<R> fmt::Debug for MpscUnboundedSink<R> {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        f.debug_struct(std::any::type_name::<Self>()).finish()
32    }
33}
34
35impl<R> ReportSink<R> for MpscUnboundedSink<R>
36where
37    R: Send + Sync + 'static,
38{
39    fn send_report(&mut self, report: R) {
40        if let Err(_reason) = self.0.unbounded_send(report) {
41            #[cfg(feature = "debug-logs")]
42            log::error!("{:?}: failed to send_report: {}", self, _reason);
43        }
44    }
45
46    fn flush(&mut self) {}
47
48    fn clone_sink(&mut self) -> BoxedReportSink<R> {
49        BoxedReportSink::new(self.clone())
50    }
51}