use std::io;
use std::io::Write;
use std::os::unix::net::UnixDatagram;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use crate::io::MultiLineWriter;
use crate::sinks::core::{MetricSink, SinkStats, SocketStats};
const DEFAULT_BUFFER_SIZE: usize = 512;
#[derive(Debug)]
pub struct UnixMetricSink {
socket: UnixDatagram,
path: PathBuf,
stats: SocketStats,
}
impl UnixMetricSink {
pub fn from<P>(path: P, socket: UnixDatagram) -> UnixMetricSink
where
P: AsRef<Path>,
{
let stats = SocketStats::default();
UnixMetricSink {
path: path.as_ref().to_path_buf(),
socket,
stats,
}
}
}
impl MetricSink for UnixMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.stats.update(
self.socket.send_to(metric.as_bytes(), self.path.as_path()),
metric.len(),
)
}
fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}
#[derive(Debug)]
pub(crate) struct UnixWriteAdapter {
path: PathBuf,
socket: UnixDatagram,
stats: SocketStats,
}
impl UnixWriteAdapter {
fn new<P>(socket: UnixDatagram, path: P, stats: SocketStats) -> UnixWriteAdapter
where
P: AsRef<Path>,
{
UnixWriteAdapter {
path: path.as_ref().to_path_buf(),
socket,
stats,
}
}
}
impl Write for UnixWriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.stats.update(self.socket.send_to(buf, &self.path), buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[derive(Debug)]
pub struct BufferedUnixMetricSink {
buffer: Mutex<MultiLineWriter<UnixWriteAdapter>>,
stats: SocketStats,
}
impl BufferedUnixMetricSink {
pub fn from<P>(path: P, socket: UnixDatagram) -> BufferedUnixMetricSink
where
P: AsRef<Path>,
{
Self::with_capacity(path, socket, DEFAULT_BUFFER_SIZE)
}
pub fn with_capacity<P>(path: P, socket: UnixDatagram, cap: usize) -> BufferedUnixMetricSink
where
P: AsRef<Path>,
{
let stats = SocketStats::default();
BufferedUnixMetricSink {
buffer: Mutex::new(MultiLineWriter::new(
UnixWriteAdapter::new(socket, path, stats.clone()),
cap,
)),
stats,
}
}
}
impl MetricSink for BufferedUnixMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
let mut writer = self.buffer.lock().unwrap();
writer.write(metric.as_bytes())
}
fn flush(&self) -> io::Result<()> {
let mut writer = self.buffer.lock().unwrap();
writer.flush()
}
fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}
#[cfg(test)]
mod tests {
use super::{BufferedUnixMetricSink, MetricSink, UnixMetricSink};
use crate::test::UnixServerHarness;
use std::os::unix::net::UnixDatagram;
#[test]
fn test_unix_metric_sink() {
let harness = UnixServerHarness::new("test_unix_metric_sink");
harness.run_quiet(|path| {
let socket = UnixDatagram::unbound().unwrap();
let sink = UnixMetricSink::from(path, socket);
assert_eq!(7, sink.emit("buz:1|m").unwrap());
});
}
#[test]
fn test_non_blocking_unix_metric_sink() {
let harness = UnixServerHarness::new("test_non_blocking_unix_metric_sink");
harness.run_quiet(|path| {
let socket = UnixDatagram::unbound().unwrap();
socket.set_nonblocking(true).unwrap();
let sink = UnixMetricSink::from(path, socket);
assert_eq!(7, sink.emit("baz:1|m").unwrap());
});
}
#[test]
fn test_buffered_unix_metric_sink() {
let harness = UnixServerHarness::new("test_buffered_unix_metric_sink");
harness.run_quiet(|path| {
let socket = UnixDatagram::unbound().unwrap();
let sink = BufferedUnixMetricSink::with_capacity(path, socket, 16);
assert_eq!(8, sink.emit("foo:54|c").unwrap());
assert_eq!(8, sink.emit("foo:67|c").unwrap());
});
}
#[test]
fn test_buffered_unix_metric_sink_flush() {
let harness = UnixServerHarness::new("test_buffered_unix_metric_sink_flush");
harness.run_quiet(|path| {
let socket = UnixDatagram::unbound().unwrap();
let sink = BufferedUnixMetricSink::with_capacity(path, socket, 16);
assert_eq!(8, sink.emit("foo:54|c").unwrap());
assert!(sink.flush().is_ok());
});
}
}