cadence_with_flush/sinks/
unix.rs

1// Cadence - An extensible Statsd client for Rust!
2//
3// Copyright 2019 Daniel Smith
4// Copyright 2019-2021 Nick Pillitteri
5//
6// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
7// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
8// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
9// option. This file may not be copied, modified, or distributed
10// except according to those terms.
11
12use std::io;
13use std::io::Write;
14use std::os::unix::net::UnixDatagram;
15use std::path::{Path, PathBuf};
16use std::sync::Mutex;
17
18use crate::io::MultiLineWriter;
19use crate::sinks::core::MetricSink;
20
21// Default size of the buffer for buffered metric sinks. This
22// is a rather conservative value, picked for consistency with
23// the UDP implementation.  Users may want to use a different
24// value based on the configuration of the server their
25// application is running on.
26const DEFAULT_BUFFER_SIZE: usize = 512;
27
28/// Implementation of a `MetricSink` that emits metrics over a Unix socket.
29///
30/// This is the most basic version of `MetricSink` that sends metrics over
31/// a Unix socket. It accepts a Unix socket instance over which to write metrics
32/// and the path of the socket for the Statsd server to send metrics to.
33///
34/// Each metric is sent to the Statsd server when the `.emit()` method is
35/// called, in the thread of the caller.
36///
37/// Note that unlike the UDP sinks, if there is no receiving socket at the path
38/// specified or nothing listening at the path, an error will be returned when
39/// metrics are emitted.
40#[derive(Debug)]
41pub struct UnixMetricSink {
42    socket: UnixDatagram,
43    path: PathBuf,
44}
45
46impl UnixMetricSink {
47    /// Construct a new `UnixMetricSink` instance.
48    ///
49    /// The socket does not need to be bound (i.e. `UnixDatagram::unbound()` is
50    /// fine) but should have any desired configuration already applied
51    /// (blocking vs non-blocking, timeouts, etc.).
52    ///
53    /// # Example
54    ///
55    /// ```no_run
56    /// use std::os::unix::net::UnixDatagram;
57    /// use cadence::UnixMetricSink;
58    ///
59    /// let socket = UnixDatagram::unbound().unwrap();
60    /// let sink = UnixMetricSink::from("/run/statsd.sock", socket);
61    /// ```
62    ///
63    /// To send metrics over a non-blocking socket, simply put the socket
64    /// in non-blocking mode before creating the Unix metric sink.
65    ///
66    /// # Non-blocking Example
67    ///
68    /// ```no_run
69    /// use std::os::unix::net::UnixDatagram;
70    /// use cadence::UnixMetricSink;
71    ///
72    /// let socket = UnixDatagram::unbound().unwrap();
73    /// socket.set_nonblocking(true).unwrap();
74    /// let sink = UnixMetricSink::from("/run/statsd.sock", socket);
75    /// ```
76    pub fn from<P>(path: P, socket: UnixDatagram) -> UnixMetricSink
77    where
78        P: AsRef<Path>,
79    {
80        UnixMetricSink {
81            path: path.as_ref().to_path_buf(),
82            socket,
83        }
84    }
85}
86
87impl MetricSink for UnixMetricSink {
88    fn emit(&self, metric: &str) -> io::Result<usize> {
89        self.socket.send_to(metric.as_bytes(), self.path.as_path())
90    }
91}
92
93/// Adapter for writing to a `UnixDatagram` socket via the `Write` trait
94#[derive(Debug)]
95pub(crate) struct UnixWriteAdapter {
96    path: PathBuf,
97    socket: UnixDatagram,
98}
99
100impl UnixWriteAdapter {
101    fn new<P>(socket: UnixDatagram, path: P) -> UnixWriteAdapter
102    where
103        P: AsRef<Path>,
104    {
105        UnixWriteAdapter {
106            path: path.as_ref().to_path_buf(),
107            socket,
108        }
109    }
110}
111
112impl Write for UnixWriteAdapter {
113    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
114        self.socket.send_to(buf, &self.path)
115    }
116
117    fn flush(&mut self) -> io::Result<()> {
118        Ok(())
119    }
120}
121
122/// Implementation of a `MetricSink` that buffers metrics before
123/// sending them to a Unix socket.
124///
125/// Metrics are line buffered, meaning that a trailing "\n" is added
126/// after each metric written to this sink. When the buffer is sufficiently
127/// full and a write is attempted, the contents of the buffer are flushed to
128/// a Unix socket and then the metric is written to the buffer. The buffer is
129/// also flushed when this sink is destroyed.
130///
131/// The default size of the buffer is 512 bytes. This is to be consistent with
132/// the default for the `BufferedUdpMetricSink`. The buffer size can be customized
133/// using the `with_capacity` method to create the sink if desired.
134///
135/// If a metric larger than the buffer is emitted, it will be written
136/// directly to the underlying Unix socket, bypassing the buffer.
137///
138/// Note that since metrics are buffered until a certain size is reached, it's
139/// possible that they may sit in the buffer for a while for applications
140/// that do not emit metrics frequently or at a high volume. For these low-
141/// throughput use cases, it may make more sense to use the `UnixMetricSink`
142/// since it sends metrics immediately with no buffering.
143///
144/// Also note that unlike the UDP sinks, if there is no receiving socket at the path
145/// specified or nothing listening at the path, an error will be returned when
146/// metrics are emitted (though this may not happen on every write due to buffering).
147#[derive(Debug)]
148pub struct BufferedUnixMetricSink {
149    buffer: Mutex<MultiLineWriter<UnixWriteAdapter>>,
150}
151
152impl BufferedUnixMetricSink {
153    /// Construct a new `BufferedUnixMetricSink` instance with a default
154    /// buffer size of 512 bytes.
155    ///
156    /// The socket does not need to be bound (i.e. `UnixDatagram::unbound()` is
157    /// fine) but should have any desired configuration already applied
158    /// (blocking vs non-blocking, timeouts, etc.).
159    ///
160    /// Writes to this sink are automatically suffixed with a Unix newline
161    /// ('\n') by the sink and stored in a 512 byte buffer until the buffer
162    /// is full or this sink is destroyed, at which point the buffer will be
163    /// flushed.
164    ///
165    /// # Example
166    ///
167    /// ```no_run
168    /// use std::os::unix::net::UnixDatagram;
169    /// use cadence::BufferedUnixMetricSink;
170    ///
171    /// let socket = UnixDatagram::unbound().unwrap();
172    /// let sink = BufferedUnixMetricSink::from("/run/statsd.sock", socket);
173    /// ```
174    pub fn from<P>(path: P, socket: UnixDatagram) -> BufferedUnixMetricSink
175    where
176        P: AsRef<Path>,
177    {
178        Self::with_capacity(path, socket, DEFAULT_BUFFER_SIZE)
179    }
180
181    /// Construct a new `BufferedUnixMetricSink` instance with a custom
182    /// buffer size.
183    ///
184    /// The socket does not need to be bound (i.e. `UnixDatagram::unbound()` is
185    /// fine) but should have with any desired configuration already applied
186    /// (blocking vs non-blocking, timeouts, etc.).
187    ///
188    /// Writes to this sink are automatically suffixed  with a Unix newline
189    /// ('\n') by the sink and stored in a buffer until the buffer is full
190    /// or this sink is destroyed, at which point the buffer will be flushed.
191    ///
192    /// # Example
193    ///
194    /// ```no_run
195    /// use std::os::unix::net::UnixDatagram;
196    /// use cadence::BufferedUnixMetricSink;
197    ///
198    /// let socket = UnixDatagram::unbound().unwrap();
199    /// let sink = BufferedUnixMetricSink::with_capacity("/run/statsd.sock", socket, 1432);
200    /// ```
201    pub fn with_capacity<P>(path: P, socket: UnixDatagram, cap: usize) -> BufferedUnixMetricSink
202    where
203        P: AsRef<Path>,
204    {
205        BufferedUnixMetricSink {
206            buffer: Mutex::new(MultiLineWriter::new(UnixWriteAdapter::new(socket, path), cap)),
207        }
208    }
209}
210
211impl MetricSink for BufferedUnixMetricSink {
212    fn emit(&self, metric: &str) -> io::Result<usize> {
213        let mut writer = self.buffer.lock().unwrap();
214        writer.write(metric.as_bytes())
215    }
216
217    fn flush(&self) -> io::Result<()> {
218        let mut writer = self.buffer.lock().unwrap();
219        writer.flush()
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::{BufferedUnixMetricSink, MetricSink, UnixMetricSink};
226    use crate::test::UnixServerHarness;
227    use std::os::unix::net::UnixDatagram;
228
229    #[test]
230    fn test_unix_metric_sink() {
231        let harness = UnixServerHarness::new("test_unix_metric_sink");
232
233        harness.run_quiet(|path| {
234            let socket = UnixDatagram::unbound().unwrap();
235            let sink = UnixMetricSink::from(path, socket);
236
237            assert_eq!(7, sink.emit("buz:1|m").unwrap());
238        });
239    }
240
241    #[test]
242    fn test_non_blocking_unix_metric_sink() {
243        let harness = UnixServerHarness::new("test_non_blocking_unix_metric_sink");
244
245        harness.run_quiet(|path| {
246            let socket = UnixDatagram::unbound().unwrap();
247            socket.set_nonblocking(true).unwrap();
248            let sink = UnixMetricSink::from(path, socket);
249
250            assert_eq!(7, sink.emit("baz:1|m").unwrap());
251        });
252    }
253
254    #[test]
255    fn test_buffered_unix_metric_sink() {
256        let harness = UnixServerHarness::new("test_buffered_unix_metric_sink");
257
258        harness.run_quiet(|path| {
259            let socket = UnixDatagram::unbound().unwrap();
260
261            // Set the capacity of the buffer such that we know it will
262            // be flushed as a response to the metrics we're writing.
263            let sink = BufferedUnixMetricSink::with_capacity(path, socket, 16);
264
265            assert_eq!(8, sink.emit("foo:54|c").unwrap());
266            assert_eq!(8, sink.emit("foo:67|c").unwrap());
267        });
268    }
269
270    #[test]
271    fn test_buffered_unix_metric_sink_flush() {
272        let harness = UnixServerHarness::new("test_buffered_unix_metric_sink_flush");
273
274        harness.run_quiet(|path| {
275            let socket = UnixDatagram::unbound().unwrap();
276
277            // Set the capacity of the buffer such that it won't be flushed
278            // from a single write. Thus we can test the flush method.
279            let sink = BufferedUnixMetricSink::with_capacity(path, socket, 16);
280
281            assert_eq!(8, sink.emit("foo:54|c").unwrap());
282            assert!(sink.flush().is_ok());
283        });
284    }
285}