use crate::sinks::core::{MetricSink, SinkStats};
use crate::sync::ExecuteStats;
use crossbeam_channel::{self, Receiver, Sender, TrySendError};
use std::fmt;
use std::io::{self, ErrorKind};
use std::panic::RefUnwindSafe;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Default)]
pub struct QueuingMetricSinkBuilder {
error_handler: Option<Box<dyn Fn(io::Error) + Sync + Send + RefUnwindSafe + 'static>>,
capacity: Option<usize>,
}
impl QueuingMetricSinkBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn build<T>(self, sink: T) -> QueuingMetricSink
where
T: MetricSink + Sync + Send + RefUnwindSafe + 'static,
{
let sink = Arc::new(sink);
let sink_c = sink.clone();
let worker = Arc::new(Worker::new(self.capacity, move |v: String| {
if let Err(e) = sink_c.emit(&v) {
if let Some(error_handler) = &self.error_handler {
error_handler(e);
}
}
}));
let worker_c = worker.clone();
let executor = crate::sync::execute(move || worker_c.run());
QueuingMetricSink { worker, executor, sink }
}
pub fn with_error_handler<F>(mut self, error_handler: F) -> Self
where
F: Fn(io::Error) + Sync + Send + RefUnwindSafe + 'static,
{
self.error_handler = Some(Box::new(error_handler));
self
}
pub fn with_capacity(mut self, capacity: usize) -> Self {
self.capacity = Some(capacity);
self
}
}
#[derive(Clone)]
pub struct QueuingMetricSink {
worker: Arc<Worker>,
executor: Arc<ExecuteStats>,
sink: Arc<dyn MetricSink + Send + Sync + RefUnwindSafe>,
}
impl fmt::Debug for QueuingMetricSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "QueuingMetricSink {{ {:?} }}", self.worker)
}
}
impl QueuingMetricSink {
pub fn builder() -> QueuingMetricSinkBuilder {
QueuingMetricSinkBuilder::new()
}
pub fn from<T>(sink: T) -> Self
where
T: MetricSink + Sync + Send + RefUnwindSafe + 'static,
{
Self::builder().build(sink)
}
pub fn with_capacity<T>(sink: T, capacity: usize) -> Self
where
T: MetricSink + Sync + Send + RefUnwindSafe + 'static,
{
Self::builder().with_capacity(capacity).build(sink)
}
pub fn panics(&self) -> u64 {
self.executor.panics()
}
pub fn queued(&self) -> u64 {
self.worker.stats.queued()
}
pub fn submitted(&self) -> u64 {
self.worker.stats.submitted()
}
pub fn drained(&self) -> u64 {
self.worker.stats.drained()
}
}
impl MetricSink for QueuingMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
#[allow(unknown_lints, clippy::io_other_error)]
match self.worker.submit(metric.to_string()) {
Err(TrySendError::Disconnected(_)) => Err(io::Error::new(ErrorKind::Other, "channel disconnected")),
Err(TrySendError::Full(_)) => Err(io::Error::new(ErrorKind::Other, "channel full")),
Ok(_) => Ok(metric.len()),
}
}
fn flush(&self) -> Result<(), std::io::Error> {
self.sink.flush()
}
fn stats(&self) -> SinkStats {
self.sink.stats()
}
}
impl Drop for QueuingMetricSink {
fn drop(&mut self) {
self.worker.stop();
}
}
#[derive(Debug, Default)]
struct WorkerStats {
submitted: AtomicU64,
drained: AtomicU64,
}
impl WorkerStats {
fn incr_submitted(&self) {
self.submitted.fetch_add(1, Ordering::Relaxed);
}
fn submitted(&self) -> u64 {
self.submitted.load(Ordering::Relaxed)
}
fn incr_drained(&self) {
self.drained.fetch_add(1, Ordering::Relaxed);
}
fn drained(&self) -> u64 {
self.drained.load(Ordering::Relaxed)
}
fn queued(&self) -> u64 {
let submitted = self.submitted.load(Ordering::Relaxed);
let drained = self.drained.load(Ordering::Relaxed);
submitted.saturating_sub(drained)
}
}
struct Worker {
task: Box<dyn Fn(String) + Sync + Send + RefUnwindSafe + 'static>,
sender: Sender<Option<String>>,
receiver: Receiver<Option<String>>,
stopped: AtomicBool,
stats: WorkerStats,
}
impl Worker {
fn new<F>(capacity: Option<usize>, task: F) -> Self
where
F: Fn(String) + Sync + Send + RefUnwindSafe + 'static,
{
let (tx, rx) = Self::get_channels(capacity);
Worker {
task: Box::new(task),
sender: tx,
receiver: rx,
stopped: AtomicBool::new(false),
stats: WorkerStats::default(),
}
}
fn get_channels(capacity: Option<usize>) -> (Sender<Option<String>>, Receiver<Option<String>>) {
if let Some(v) = capacity {
crossbeam_channel::bounded(v)
} else {
crossbeam_channel::unbounded()
}
}
fn submit(&self, v: String) -> Result<(), TrySendError<Option<String>>> {
let res = self.sender.try_send(Some(v));
if res.is_ok() {
self.stats.incr_submitted();
}
res
}
fn run(&self) {
for opt in self.receiver.iter() {
if let Some(v) = opt {
self.stats.incr_drained();
(self.task)(v);
} else {
break;
}
}
self.stopped.store(true, Ordering::Release);
}
fn stop(&self) {
let _ = self.sender.try_send(None);
}
#[cfg(test)]
fn stop_and_wait(&self) {
self.stop();
while !self.stopped.load(Ordering::Acquire) {
std::thread::yield_now();
}
}
#[cfg(test)]
fn is_empty(&self) -> bool {
self.receiver.is_empty()
}
#[cfg(test)]
fn is_stopped(&self) -> bool {
self.stopped.load(Ordering::Acquire)
}
}
impl fmt::Debug for Worker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Worker {{ ... }}")
}
}
#[cfg(test)]
mod tests {
use super::{QueuingMetricSink, Worker};
use crate::sinks::MetricSink;
use crate::sinks::SpyMetricSink;
use crate::test::PanickingMetricSink;
use std::io;
use std::panic;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
const QUEUE_SIZE: Option<usize> = Some(128);
#[test]
fn test_worker_submit_processes_event() {
let flag = Arc::new(AtomicBool::new(false));
let flag_ref = flag.clone();
let task = move |v: String| {
if v == "foo" {
flag_ref.store(true, Ordering::Release);
}
};
let worker = Arc::new(Worker::new(QUEUE_SIZE, task));
let worker_ref = worker.clone();
let t = thread::spawn(move || {
worker_ref.run();
});
worker.submit("bar".to_string()).unwrap();
worker.submit("foo".to_string()).unwrap();
worker.stop();
t.join().unwrap();
assert!(flag.load(Ordering::Acquire));
}
#[test]
fn test_worker_stop() {
let worker = Arc::new(Worker::new(QUEUE_SIZE, move |_: String| {}));
let worker_ref = worker.clone();
let t = thread::spawn(move || {
worker_ref.run();
});
worker.stop();
t.join().unwrap();
assert!(worker.is_stopped());
}
#[test]
fn test_worker_stop_and_wait() {
let worker = Arc::new(Worker::new(QUEUE_SIZE, move |_: String| {}));
let worker_ref = worker.clone();
let _t = thread::spawn(move || {
worker_ref.run();
});
worker.stop_and_wait();
assert!(worker.is_stopped());
}
#[test]
fn test_worker_panic_on_submit_side() {
let worker = Arc::new(Worker::new(QUEUE_SIZE, move |_: String| {}));
let worker_ref1 = worker.clone();
let worker_ref2 = worker.clone();
#[allow(unreachable_code)]
#[allow(clippy::diverging_sub_expression)]
let t1 = thread::spawn(move || {
worker_ref1.submit(panic!("This thread is supposed to panic")).unwrap();
});
let t2 = thread::spawn(move || {
worker_ref2.run();
});
worker.stop();
assert!(t1.join().is_err());
assert!(t2.join().is_ok());
assert!(worker.is_stopped());
assert!(worker.is_empty());
}
#[test]
fn test_worker_panic_on_run_side() {
let worker = Arc::new(Worker::new(QUEUE_SIZE, move |_: String| {
panic!("This thread is supposed to panic");
}));
let worker_ref1 = worker.clone();
let worker_ref2 = worker.clone();
let t1 = thread::spawn(move || {
worker_ref1.submit("foo".to_owned()).unwrap();
});
let t2 = thread::spawn(move || {
worker_ref2.run();
});
assert!(t1.join().is_ok());
assert!(t2.join().is_err());
assert!(!worker.is_stopped());
assert!(worker.is_empty());
}
#[test]
fn test_queuing_sink_emit() {
let (rx, spy) = SpyMetricSink::new();
let queuing = QueuingMetricSink::from(spy);
queuing.emit("foo.counter:1|c").unwrap();
queuing.emit("bar.counter:2|c").unwrap();
queuing.emit("baz.counter:3|c").unwrap();
queuing.worker.stop_and_wait();
let m1 = rx.try_recv().unwrap();
let m2 = rx.try_recv().unwrap();
let m3 = rx.try_recv().unwrap();
assert_eq!("foo.counter:1|c".as_bytes(), m1.as_slice());
assert_eq!("bar.counter:2|c".as_bytes(), m2.as_slice());
assert_eq!("baz.counter:3|c".as_bytes(), m3.as_slice());
}
#[test]
fn test_queuing_sink_emit_panics() {
let queuing = QueuingMetricSink::from(PanickingMetricSink::always());
queuing.emit("foo.counter:4|c").unwrap();
queuing.emit("foo.counter:5|c").unwrap();
queuing.emit("foo.timer:34|ms").unwrap();
queuing.worker.stop_and_wait();
assert_eq!(3, queuing.panics());
}
#[test]
fn test_queuing_sink_emit_recover_from_panics() {
let queuing = QueuingMetricSink::from(PanickingMetricSink::every(2));
queuing.emit("foo.counter:4|c").unwrap();
queuing.emit("foo.counter:5|c").unwrap();
queuing.emit("foo.timer:34|ms").unwrap();
queuing.worker.stop_and_wait();
assert_eq!(1, queuing.panics());
assert_eq!(3, queuing.drained());
}
#[test]
fn test_queuing_sink_panic_handler() {
let queuing = QueuingMetricSink::from(PanickingMetricSink::always());
let res = panic::catch_unwind(move || {
queuing.emit("foo.counter:4|c").unwrap();
queuing.emit("foo.counter:5|c").unwrap();
queuing.emit("foo.timer:34|ms").unwrap();
queuing.worker.stop_and_wait();
});
assert!(res.is_ok());
}
#[test]
fn test_queuing_metric_sink_blocking_sink_back_pressure() {
struct BlockingMetricSink;
impl MetricSink for BlockingMetricSink {
fn emit(&self, _m: &str) -> io::Result<usize> {
loop {
thread::park();
}
}
}
let queueing = QueuingMetricSink::with_capacity(BlockingMetricSink, 1);
let results = [
queueing.emit("foo.counter:1|c"),
queueing.emit("foo.counter:2|c"),
queueing.emit("foo.counter:3|c"),
];
let success = results.iter().map(|r| r.is_ok()).filter(|r| *r).count();
let failure = results.iter().map(|r| r.is_err()).filter(|r| *r).count();
assert!(
success >= 1,
"At least one submission to the queue should have succeeded"
);
assert!(failure >= 1, "At least one submission to the queue should have failed");
}
}