1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// Cadence - An extensible Statsd client for Rust!
//
// Copyright 2015-2021 Nick Pillitteri
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

/// I/O telemetry for a `MetricSink` implementation.
#[derive(Clone, Debug, Default)]
pub struct SinkStats {
    pub bytes_sent: u64,
    pub packets_sent: u64,
    pub bytes_dropped: u64,
    pub packets_dropped: u64,
}

/// Thread-safe collection of stats updated by network sinks.
///
/// This struct is meant to be updated internally by `MetricSink` implementations
/// and converted to an instance of `SinkStats` for consumption by external callers.
///
/// # Example
///
/// ```
/// use std::net::{SocketAddr, UdpSocket};
/// use cadence::ext::SocketStats;
/// use cadence::{MetricSink, SinkStats};
///
/// pub struct MyCustomSink {
///     addr: SocketAddr,
///     socket: UdpSocket,
///     stats: SocketStats,
/// }
///
/// impl MetricSink for MyCustomSink {
///     fn emit(&self, metric: &str) -> std::io::Result<usize> {
///         let res = self.socket.send_to(metric.as_bytes(), &self.addr);
///         self.stats.update(res, metric.len())
///     }
///
///     fn stats(&self) -> SinkStats {
///        (&self.stats).into()
///     }
/// }
///
/// ```
#[derive(Debug, Clone, Default)]
pub struct SocketStats {
    bytes_sent: Arc<AtomicU64>,
    packets_sent: Arc<AtomicU64>,
    bytes_dropped: Arc<AtomicU64>,
    packets_dropped: Arc<AtomicU64>,
}

impl SocketStats {
    pub fn incr_bytes_sent(&self, n: u64) {
        self.bytes_sent.fetch_add(n, Ordering::Relaxed);
    }

    pub fn incr_packets_sent(&self) {
        self.packets_sent.fetch_add(1, Ordering::Relaxed);
    }

    pub fn incr_bytes_dropped(&self, n: u64) {
        self.bytes_dropped.fetch_add(n, Ordering::Relaxed);
    }

    pub fn incr_packets_dropped(&self) {
        self.packets_dropped.fetch_add(1, Ordering::Relaxed);
    }

    pub fn update(&self, res: io::Result<usize>, len: usize) -> io::Result<usize> {
        match res {
            Ok(written) => {
                self.incr_bytes_sent(written as u64);
                self.incr_packets_sent();
                Ok(written)
            }
            Err(e) => {
                self.incr_bytes_dropped(len as u64);
                self.incr_packets_dropped();
                Err(e)
            }
        }
    }
}

impl From<&SocketStats> for SinkStats {
    fn from(stats: &SocketStats) -> Self {
        SinkStats {
            bytes_sent: stats.bytes_sent.load(Ordering::Relaxed),
            packets_sent: stats.packets_sent.load(Ordering::Relaxed),
            bytes_dropped: stats.bytes_dropped.load(Ordering::Relaxed),
            packets_dropped: stats.packets_dropped.load(Ordering::Relaxed),
        }
    }
}

/// Trait for various backends that send Statsd metrics somewhere.
///
/// The metric string will be in the canonical format to be sent to a
/// Statsd server. The metric string will not include a trailing newline.
/// Examples of each supported metric type are given below.
///
/// ## Counter
///
/// ``` text
/// some.counter:123|c
/// ```
///
/// ## Timer
///
/// ``` text
/// some.timer:456|ms
/// ```
///
/// ## Gauge
///
/// ``` text
/// some.gauge:5|g
/// ```
///
/// ## Meter
///
/// ``` text
/// some.meter:8|m
/// ```
///
/// ## Histogram
///
/// ``` text
/// some.histogram:4|h
/// ```
///
/// ## Set
///
/// ``` text
/// some.set:2|s
/// ```
///
/// ## Distribution
///
/// ``` text
/// some.distribution:2|d
/// ```
///
/// See the [Statsd spec](https://github.com/b/statsd_spec) for more
/// information.
pub trait MetricSink {
    /// Send the Statsd metric using this sink and return the number of bytes
    /// written or an I/O error.
    ///
    /// Note that implementations may return `0` bytes if the metric is not
    /// immediately written (such as when it is buffered).  Callers should *NOT*
    /// interpret this as an error.
    fn emit(&self, metric: &str) -> io::Result<usize>;

    /// Flush any currently buffered metrics to the underlying backend, returning
    /// an I/O error if they could not be written for some reason.
    ///
    /// Note that not all sinks buffer metrics and so the default implementation of
    /// this method does nothing.
    fn flush(&self) -> io::Result<()> {
        Ok(())
    }

    /// Return I/O telemetry like bytes / packets sent or dropped.
    ///
    /// Note that not all sinks implement this method and the default implementation
    /// returns zeros.
    fn stats(&self) -> SinkStats {
        SinkStats::default()
    }
}

/// Implementation of a `MetricSink` that discards all metrics.
///
/// Useful for disabling metric collection or unit tests.
#[derive(Debug, Clone)]
pub struct NopMetricSink;

impl MetricSink for NopMetricSink {
    fn emit(&self, _metric: &str) -> io::Result<usize> {
        Ok(0)
    }
}

#[cfg(test)]
mod tests {
    use super::{MetricSink, NopMetricSink};
    #[test]
    fn test_nop_metric_sink() {
        let sink = NopMetricSink;
        assert_eq!(0, sink.emit("baz:4|c").unwrap());
    }
}