use crate::io::MultiLineWriter;
use crate::sinks::core::{MetricSink, SinkStats, SocketStats};
use crate::sinks::resolve::{PeriodicResolver, Resolver, StaticResolver};
use crate::types::MetricResult;
use std::io::{self, Write};
use std::net::{ToSocketAddrs, UdpSocket};
use std::panic::RefUnwindSafe;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{fmt, thread};
const DEFAULT_BUFFER_SIZE: usize = 512;
fn get_resolver<A>(
addr: A,
period: Option<Duration>,
error_handler: Option<Box<dyn Fn(io::Error) + Sync + Send + RefUnwindSafe>>,
) -> MetricResult<Arc<dyn Resolver + Send + Sync + RefUnwindSafe>>
where
A: ToSocketAddrs + fmt::Debug + Send + Sync + RefUnwindSafe + 'static,
{
match period {
Some(duration) => {
let error_handler = error_handler.unwrap_or_else(|| Box::new(|_e| {}));
let sleep = |d| thread::sleep(d);
let resolver = Arc::new(PeriodicResolver::new(addr, duration, error_handler, sleep)?);
let resolver_c = resolver.clone();
crate::sync::execute(move || resolver_c.run());
Ok(resolver)
}
None => Ok(Arc::new(StaticResolver::new(addr)?)),
}
}
#[derive(Default)]
pub struct UdpMetricSinkBuilder {
resolver_period: Option<Duration>,
resolver_error_handler: Option<Box<dyn Fn(io::Error) + Sync + Send + RefUnwindSafe>>,
}
impl UdpMetricSinkBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn build<A>(self, to_addr: A, socket: UdpSocket) -> MetricResult<UdpMetricSink>
where
A: ToSocketAddrs + fmt::Debug + Send + Sync + RefUnwindSafe + 'static,
{
let resolver = get_resolver(to_addr, self.resolver_period, self.resolver_error_handler)?;
let stats = SocketStats::default();
Ok(UdpMetricSink {
resolver,
socket,
stats,
})
}
pub fn with_resolver_period(mut self, duration: Duration) -> Self {
self.resolver_period = Some(duration);
self
}
pub fn with_resolver_error_handler<F>(mut self, error_handler: F) -> Self
where
F: Fn(io::Error) + Sync + Send + RefUnwindSafe + 'static,
{
self.resolver_error_handler = Some(Box::new(error_handler));
self
}
}
pub struct UdpMetricSink {
resolver: Arc<dyn Resolver + Send + Sync + RefUnwindSafe>,
socket: UdpSocket,
stats: SocketStats,
}
impl UdpMetricSink {
pub fn builder() -> UdpMetricSinkBuilder {
UdpMetricSinkBuilder::new()
}
pub fn from<A>(to_addr: A, socket: UdpSocket) -> MetricResult<Self>
where
A: ToSocketAddrs,
{
let resolver = Arc::new(StaticResolver::new(to_addr)?);
let stats = SocketStats::default();
Ok(Self {
resolver,
socket,
stats,
})
}
}
impl MetricSink for UdpMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.stats.update(
self.socket.send_to(metric.as_bytes(), self.resolver.get_addr()),
metric.len(),
)
}
fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}
impl Drop for UdpMetricSink {
fn drop(&mut self) {
self.resolver.stop();
}
}
impl fmt::Debug for UdpMetricSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UdpMetricSink")
.field("resolver", &"...")
.field("socket", &self.socket)
.field("stats", &self.stats)
.finish()
}
}
pub(crate) struct UdpWriteAdapter {
resolver: Arc<dyn Resolver + Send + Sync + RefUnwindSafe>,
socket: UdpSocket,
stats: SocketStats,
}
impl UdpWriteAdapter {
pub(crate) fn new(
resolver: Arc<dyn Resolver + Send + Sync + RefUnwindSafe>,
socket: UdpSocket,
stats: SocketStats,
) -> Self {
Self {
resolver,
socket,
stats,
}
}
}
impl Write for UdpWriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.stats
.update(self.socket.send_to(buf, self.resolver.get_addr()), buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Drop for UdpWriteAdapter {
fn drop(&mut self) {
self.resolver.stop();
}
}
impl fmt::Debug for UdpWriteAdapter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UdpWriteAdapter")
.field("resolver", &"...")
.field("socket", &self.socket)
.field("stats", &self.stats)
.finish()
}
}
#[derive(Default)]
pub struct BufferedUdpMetricSinkBuilder {
capacity: Option<usize>,
resolver_period: Option<Duration>,
resolver_error_handler: Option<Box<dyn Fn(io::Error) + Sync + Send + RefUnwindSafe>>,
}
impl BufferedUdpMetricSinkBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn build<A>(self, to_addr: A, socket: UdpSocket) -> MetricResult<BufferedUdpMetricSink>
where
A: ToSocketAddrs + fmt::Debug + Send + Sync + RefUnwindSafe + 'static,
{
let resolver = get_resolver(to_addr, self.resolver_period, self.resolver_error_handler)?;
let stats = SocketStats::default();
Ok(BufferedUdpMetricSink {
buffer: Mutex::new(MultiLineWriter::new(
UdpWriteAdapter::new(resolver, socket, stats.clone()),
self.capacity.unwrap_or(DEFAULT_BUFFER_SIZE),
)),
stats,
})
}
pub fn with_capacity(mut self, capacity: usize) -> Self {
self.capacity = Some(capacity);
self
}
pub fn with_resolver_period(mut self, duration: Duration) -> Self {
self.resolver_period = Some(duration);
self
}
pub fn with_resolver_error_handler<F>(mut self, error_handler: F) -> Self
where
F: Fn(io::Error) + Sync + Send + RefUnwindSafe + 'static,
{
self.resolver_error_handler = Some(Box::new(error_handler));
self
}
}
#[derive(Debug)]
pub struct BufferedUdpMetricSink {
buffer: Mutex<MultiLineWriter<UdpWriteAdapter>>,
stats: SocketStats,
}
impl BufferedUdpMetricSink {
pub fn from<A>(sink_addr: A, socket: UdpSocket) -> MetricResult<Self>
where
A: ToSocketAddrs,
{
Self::with_capacity(sink_addr, socket, DEFAULT_BUFFER_SIZE)
}
pub fn with_capacity<A>(to_addr: A, socket: UdpSocket, cap: usize) -> MetricResult<Self>
where
A: ToSocketAddrs,
{
let resolver = Arc::new(StaticResolver::new(to_addr)?);
let stats = SocketStats::default();
Ok(BufferedUdpMetricSink {
buffer: Mutex::new(MultiLineWriter::new(
UdpWriteAdapter::new(resolver, socket, stats.clone()),
cap,
)),
stats,
})
}
pub fn builder() -> BufferedUdpMetricSinkBuilder {
BufferedUdpMetricSinkBuilder::new()
}
}
impl MetricSink for BufferedUdpMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
let mut writer = self.buffer.lock().unwrap();
writer.write(metric.as_bytes())
}
fn flush(&self) -> io::Result<()> {
let mut writer = self.buffer.lock().unwrap();
writer.flush()
}
fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}
#[cfg(test)]
mod tests {
use super::{BufferedUdpMetricSink, MetricSink, UdpMetricSink};
use std::net::UdpSocket;
#[test]
fn test_udp_metric_sink() {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let sink = UdpMetricSink::from("127.0.0.1:8125", socket).unwrap();
assert_eq!(7, sink.emit("buz:1|m").unwrap());
}
#[test]
fn test_non_blocking_udp_metric_sink() {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_nonblocking(true).unwrap();
let sink = UdpMetricSink::from("127.0.0.1:8125", socket).unwrap();
assert_eq!(7, sink.emit("baz:1|m").unwrap());
}
#[test]
fn test_buffered_udp_metric_sink() {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let sink = BufferedUdpMetricSink::with_capacity("127.0.0.1:8125", socket, 16).unwrap();
assert_eq!(8, sink.emit("foo:54|c").unwrap());
assert_eq!(8, sink.emit("foo:67|c").unwrap());
}
#[test]
fn test_buffered_udp_metric_sink_flush() {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let sink = BufferedUdpMetricSink::with_capacity("127.0.0.1:8125", socket, 64).unwrap();
assert_eq!(8, sink.emit("foo:54|c").unwrap());
assert!(sink.flush().is_ok());
}
#[test]
fn test_buffered_udp_metric_sink_stats() {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let sink = BufferedUdpMetricSink::with_capacity("127.0.0.1:8125", socket, 16).unwrap();
sink.emit("foo:54|c").unwrap();
sink.emit("foo:67|c").unwrap();
sink.flush().unwrap();
let stats = sink.stats();
assert!(
stats.bytes_sent > 0,
"Expected bytes_sent > 0, got {}",
stats.bytes_sent
);
assert!(
stats.packets_sent > 0,
"Expected packets_sent > 0, got {}",
stats.packets_sent
);
}
}