cadence_with_flush/sinks/unix.rs
1// Cadence - An extensible Statsd client for Rust!
2//
3// Copyright 2019 Daniel Smith
4// Copyright 2019-2021 Nick Pillitteri
5//
6// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
7// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
8// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
9// option. This file may not be copied, modified, or distributed
10// except according to those terms.
11
12use std::io;
13use std::io::Write;
14use std::os::unix::net::UnixDatagram;
15use std::path::{Path, PathBuf};
16use std::sync::Mutex;
17
18use crate::io::MultiLineWriter;
19use crate::sinks::core::MetricSink;
20
21// Default size of the buffer for buffered metric sinks. This
22// is a rather conservative value, picked for consistency with
23// the UDP implementation. Users may want to use a different
24// value based on the configuration of the server their
25// application is running on.
26const DEFAULT_BUFFER_SIZE: usize = 512;
27
28/// Implementation of a `MetricSink` that emits metrics over a Unix socket.
29///
30/// This is the most basic version of `MetricSink` that sends metrics over
31/// a Unix socket. It accepts a Unix socket instance over which to write metrics
32/// and the path of the socket for the Statsd server to send metrics to.
33///
34/// Each metric is sent to the Statsd server when the `.emit()` method is
35/// called, in the thread of the caller.
36///
37/// Note that unlike the UDP sinks, if there is no receiving socket at the path
38/// specified or nothing listening at the path, an error will be returned when
39/// metrics are emitted.
40#[derive(Debug)]
41pub struct UnixMetricSink {
42 socket: UnixDatagram,
43 path: PathBuf,
44}
45
46impl UnixMetricSink {
47 /// Construct a new `UnixMetricSink` instance.
48 ///
49 /// The socket does not need to be bound (i.e. `UnixDatagram::unbound()` is
50 /// fine) but should have any desired configuration already applied
51 /// (blocking vs non-blocking, timeouts, etc.).
52 ///
53 /// # Example
54 ///
55 /// ```no_run
56 /// use std::os::unix::net::UnixDatagram;
57 /// use cadence::UnixMetricSink;
58 ///
59 /// let socket = UnixDatagram::unbound().unwrap();
60 /// let sink = UnixMetricSink::from("/run/statsd.sock", socket);
61 /// ```
62 ///
63 /// To send metrics over a non-blocking socket, simply put the socket
64 /// in non-blocking mode before creating the Unix metric sink.
65 ///
66 /// # Non-blocking Example
67 ///
68 /// ```no_run
69 /// use std::os::unix::net::UnixDatagram;
70 /// use cadence::UnixMetricSink;
71 ///
72 /// let socket = UnixDatagram::unbound().unwrap();
73 /// socket.set_nonblocking(true).unwrap();
74 /// let sink = UnixMetricSink::from("/run/statsd.sock", socket);
75 /// ```
76 pub fn from<P>(path: P, socket: UnixDatagram) -> UnixMetricSink
77 where
78 P: AsRef<Path>,
79 {
80 UnixMetricSink {
81 path: path.as_ref().to_path_buf(),
82 socket,
83 }
84 }
85}
86
87impl MetricSink for UnixMetricSink {
88 fn emit(&self, metric: &str) -> io::Result<usize> {
89 self.socket.send_to(metric.as_bytes(), self.path.as_path())
90 }
91}
92
93/// Adapter for writing to a `UnixDatagram` socket via the `Write` trait
94#[derive(Debug)]
95pub(crate) struct UnixWriteAdapter {
96 path: PathBuf,
97 socket: UnixDatagram,
98}
99
100impl UnixWriteAdapter {
101 fn new<P>(socket: UnixDatagram, path: P) -> UnixWriteAdapter
102 where
103 P: AsRef<Path>,
104 {
105 UnixWriteAdapter {
106 path: path.as_ref().to_path_buf(),
107 socket,
108 }
109 }
110}
111
112impl Write for UnixWriteAdapter {
113 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
114 self.socket.send_to(buf, &self.path)
115 }
116
117 fn flush(&mut self) -> io::Result<()> {
118 Ok(())
119 }
120}
121
122/// Implementation of a `MetricSink` that buffers metrics before
123/// sending them to a Unix socket.
124///
125/// Metrics are line buffered, meaning that a trailing "\n" is added
126/// after each metric written to this sink. When the buffer is sufficiently
127/// full and a write is attempted, the contents of the buffer are flushed to
128/// a Unix socket and then the metric is written to the buffer. The buffer is
129/// also flushed when this sink is destroyed.
130///
131/// The default size of the buffer is 512 bytes. This is to be consistent with
132/// the default for the `BufferedUdpMetricSink`. The buffer size can be customized
133/// using the `with_capacity` method to create the sink if desired.
134///
135/// If a metric larger than the buffer is emitted, it will be written
136/// directly to the underlying Unix socket, bypassing the buffer.
137///
138/// Note that since metrics are buffered until a certain size is reached, it's
139/// possible that they may sit in the buffer for a while for applications
140/// that do not emit metrics frequently or at a high volume. For these low-
141/// throughput use cases, it may make more sense to use the `UnixMetricSink`
142/// since it sends metrics immediately with no buffering.
143///
144/// Also note that unlike the UDP sinks, if there is no receiving socket at the path
145/// specified or nothing listening at the path, an error will be returned when
146/// metrics are emitted (though this may not happen on every write due to buffering).
147#[derive(Debug)]
148pub struct BufferedUnixMetricSink {
149 buffer: Mutex<MultiLineWriter<UnixWriteAdapter>>,
150}
151
152impl BufferedUnixMetricSink {
153 /// Construct a new `BufferedUnixMetricSink` instance with a default
154 /// buffer size of 512 bytes.
155 ///
156 /// The socket does not need to be bound (i.e. `UnixDatagram::unbound()` is
157 /// fine) but should have any desired configuration already applied
158 /// (blocking vs non-blocking, timeouts, etc.).
159 ///
160 /// Writes to this sink are automatically suffixed with a Unix newline
161 /// ('\n') by the sink and stored in a 512 byte buffer until the buffer
162 /// is full or this sink is destroyed, at which point the buffer will be
163 /// flushed.
164 ///
165 /// # Example
166 ///
167 /// ```no_run
168 /// use std::os::unix::net::UnixDatagram;
169 /// use cadence::BufferedUnixMetricSink;
170 ///
171 /// let socket = UnixDatagram::unbound().unwrap();
172 /// let sink = BufferedUnixMetricSink::from("/run/statsd.sock", socket);
173 /// ```
174 pub fn from<P>(path: P, socket: UnixDatagram) -> BufferedUnixMetricSink
175 where
176 P: AsRef<Path>,
177 {
178 Self::with_capacity(path, socket, DEFAULT_BUFFER_SIZE)
179 }
180
181 /// Construct a new `BufferedUnixMetricSink` instance with a custom
182 /// buffer size.
183 ///
184 /// The socket does not need to be bound (i.e. `UnixDatagram::unbound()` is
185 /// fine) but should have with any desired configuration already applied
186 /// (blocking vs non-blocking, timeouts, etc.).
187 ///
188 /// Writes to this sink are automatically suffixed with a Unix newline
189 /// ('\n') by the sink and stored in a buffer until the buffer is full
190 /// or this sink is destroyed, at which point the buffer will be flushed.
191 ///
192 /// # Example
193 ///
194 /// ```no_run
195 /// use std::os::unix::net::UnixDatagram;
196 /// use cadence::BufferedUnixMetricSink;
197 ///
198 /// let socket = UnixDatagram::unbound().unwrap();
199 /// let sink = BufferedUnixMetricSink::with_capacity("/run/statsd.sock", socket, 1432);
200 /// ```
201 pub fn with_capacity<P>(path: P, socket: UnixDatagram, cap: usize) -> BufferedUnixMetricSink
202 where
203 P: AsRef<Path>,
204 {
205 BufferedUnixMetricSink {
206 buffer: Mutex::new(MultiLineWriter::new(UnixWriteAdapter::new(socket, path), cap)),
207 }
208 }
209}
210
211impl MetricSink for BufferedUnixMetricSink {
212 fn emit(&self, metric: &str) -> io::Result<usize> {
213 let mut writer = self.buffer.lock().unwrap();
214 writer.write(metric.as_bytes())
215 }
216
217 fn flush(&self) -> io::Result<()> {
218 let mut writer = self.buffer.lock().unwrap();
219 writer.flush()
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::{BufferedUnixMetricSink, MetricSink, UnixMetricSink};
226 use crate::test::UnixServerHarness;
227 use std::os::unix::net::UnixDatagram;
228
229 #[test]
230 fn test_unix_metric_sink() {
231 let harness = UnixServerHarness::new("test_unix_metric_sink");
232
233 harness.run_quiet(|path| {
234 let socket = UnixDatagram::unbound().unwrap();
235 let sink = UnixMetricSink::from(path, socket);
236
237 assert_eq!(7, sink.emit("buz:1|m").unwrap());
238 });
239 }
240
241 #[test]
242 fn test_non_blocking_unix_metric_sink() {
243 let harness = UnixServerHarness::new("test_non_blocking_unix_metric_sink");
244
245 harness.run_quiet(|path| {
246 let socket = UnixDatagram::unbound().unwrap();
247 socket.set_nonblocking(true).unwrap();
248 let sink = UnixMetricSink::from(path, socket);
249
250 assert_eq!(7, sink.emit("baz:1|m").unwrap());
251 });
252 }
253
254 #[test]
255 fn test_buffered_unix_metric_sink() {
256 let harness = UnixServerHarness::new("test_buffered_unix_metric_sink");
257
258 harness.run_quiet(|path| {
259 let socket = UnixDatagram::unbound().unwrap();
260
261 // Set the capacity of the buffer such that we know it will
262 // be flushed as a response to the metrics we're writing.
263 let sink = BufferedUnixMetricSink::with_capacity(path, socket, 16);
264
265 assert_eq!(8, sink.emit("foo:54|c").unwrap());
266 assert_eq!(8, sink.emit("foo:67|c").unwrap());
267 });
268 }
269
270 #[test]
271 fn test_buffered_unix_metric_sink_flush() {
272 let harness = UnixServerHarness::new("test_buffered_unix_metric_sink_flush");
273
274 harness.run_quiet(|path| {
275 let socket = UnixDatagram::unbound().unwrap();
276
277 // Set the capacity of the buffer such that it won't be flushed
278 // from a single write. Thus we can test the flush method.
279 let sink = BufferedUnixMetricSink::with_capacity(path, socket, 16);
280
281 assert_eq!(8, sink.emit("foo:54|c").unwrap());
282 assert!(sink.flush().is_ok());
283 });
284 }
285}