use crate::io::MultiLineWriter;
use crate::sinks::core::MetricSink;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender, TrySendError};
use std::io::{self, ErrorKind, Write};
use std::sync::Mutex;
const DEFAULT_BUFFER_SIZE: usize = 512;
#[derive(Debug)]
pub struct SpyMetricSink {
sender: Sender<Vec<u8>>,
}
impl SpyMetricSink {
pub fn new() -> (Receiver<Vec<u8>>, Self) {
Self::with_queue_capacity(None)
}
pub fn with_capacity(queue: usize) -> (Receiver<Vec<u8>>, Self) {
Self::with_queue_capacity(Some(queue))
}
fn with_queue_capacity(queue: Option<usize>) -> (Receiver<Vec<u8>>, Self) {
let (tx, rx) = new_channel(queue);
let sink = SpyMetricSink { sender: tx };
(rx, sink)
}
}
impl MetricSink for SpyMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
send_metric(&self.sender, metric.as_bytes())
}
}
#[derive(Debug)]
pub struct BufferedSpyMetricSink {
writer: Mutex<MultiLineWriter<WriteAdapter>>,
}
impl BufferedSpyMetricSink {
pub fn new() -> (Receiver<Vec<u8>>, Self) {
Self::with_capacity(None, Some(DEFAULT_BUFFER_SIZE))
}
pub fn with_capacity(queue: Option<usize>, buffer: Option<usize>) -> (Receiver<Vec<u8>>, Self) {
let (tx, rx) = new_channel(queue);
let buffer_sz = buffer.unwrap_or(DEFAULT_BUFFER_SIZE);
let writer = MultiLineWriter::new(WriteAdapter::new(tx), buffer_sz);
let sink = BufferedSpyMetricSink {
writer: Mutex::new(writer),
};
(rx, sink)
}
}
impl MetricSink for BufferedSpyMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
let mut writer = self.writer.lock().unwrap();
writer.write(metric.as_bytes())
}
fn flush(&self) -> io::Result<()> {
let mut writer = self.writer.lock().unwrap();
writer.flush()
}
}
#[derive(Debug)]
struct WriteAdapter {
sender: Sender<Vec<u8>>,
}
impl WriteAdapter {
fn new(sender: Sender<Vec<u8>>) -> Self {
WriteAdapter { sender }
}
}
impl Write for WriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
send_metric(&self.sender, buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
fn new_channel(cap: Option<usize>) -> (Sender<Vec<u8>>, Receiver<Vec<u8>>) {
if let Some(sz) = cap {
bounded(sz)
} else {
unbounded()
}
}
fn send_metric(sender: &Sender<Vec<u8>>, metric: &[u8]) -> io::Result<usize> {
#[allow(unknown_lints, clippy::io_other_error)]
match sender.try_send(metric.to_vec()) {
Err(TrySendError::Disconnected(_)) => Err(io::Error::new(ErrorKind::Other, "channel disconnected")),
Err(TrySendError::Full(_)) => Err(io::Error::new(ErrorKind::Other, "channel full")),
Ok(_) => Ok(metric.len()),
}
}
#[cfg(test)]
mod test {
use super::{BufferedSpyMetricSink, MetricSink, SpyMetricSink};
#[test]
fn test_spy_metric_sink() {
let (rx, sink) = SpyMetricSink::new();
sink.emit("buz:1|c").unwrap();
let sent = rx.recv().unwrap();
assert_eq!(b"buz:1|c", sent.as_slice());
}
#[test]
fn test_buffered_spy_metric_sink() {
let rx = {
let (rx, sink) = BufferedSpyMetricSink::with_capacity(None, Some(64));
sink.emit("foo:54|c").unwrap();
sink.emit("foo:67|c").unwrap();
rx
};
let sent = rx.recv().unwrap();
assert_eq!(b"foo:54|c\nfoo:67|c\n", sent.as_slice());
}
#[test]
fn test_buffered_spy_metric_sink_flush() {
let (rx, sink) = BufferedSpyMetricSink::with_capacity(None, Some(64));
sink.emit("foo:54|c").unwrap();
sink.emit("foo:67|c").unwrap();
assert!(rx.is_empty());
let flush = sink.flush();
let sent = rx.recv().unwrap();
assert_eq!(b"foo:54|c\nfoo:67|c\n", sent.as_slice());
assert!(flush.is_ok());
}
}