cadence-with-flush 0.29.0

A fork of candence with client flush support
// Cadence - An extensible Statsd client for Rust!
// Copyright 2019-2021 Nick Pillitteri
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
//> or the MIT license
// <LICENSE-MIT or>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! Utilities for testing Cadence itself.
//! Functionality exported to be used by integration tests. This module
//! is NOT part of the Cadence API and is subject to change at any time.

use crate::sinks::MetricSink;
use std::fs;
use std::io::{self, ErrorKind};
use std::os::unix::net::UnixDatagram;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;
use std::{env, thread};

/// Create a temporary directory and construct paths to files within it
/// When this object goes out of scope, any files under the temporary directory
/// it is responsible for ($TMP + $PREFIX) will be deleted.
pub struct TempDir {
    base: PathBuf,

impl TempDir {
    pub fn new<P>(prefix: P) -> io::Result<Self>
        P: AsRef<Path>,
        let base = env::temp_dir().join(prefix);
        Ok(TempDir { base })

    pub fn new_path<P>(&self, name: P) -> PathBuf
        P: AsRef<Path>,

impl Drop for TempDir {
    fn drop(&mut self) {
        let _ = fs::remove_dir_all(&self.base);

pub trait DatagramConsumer {
    fn accept(&self, datagram: String);

impl<F> DatagramConsumer for F
    F: Fn(String),
    fn accept(&self, datagram: String) {

/// Basic server for listening on a given Unix socket path.
/// This server reads messages from a Unix datagram socket in a loop, ensures
/// they are valid UTF-8 strings, and then discards them. Any errors are printed
/// to `stderr`.
/// This server is only meant for testing Unix socket related functionality in
/// Cadence itself.
pub struct UnixSocketServer {
    ready: AtomicBool,
    shutdown: AtomicBool,
    path: PathBuf,
    consumer: Arc<dyn DatagramConsumer + Send + Sync + 'static>,
    interval: Duration,

impl UnixSocketServer {
    /// Create a new server that will listen for datagrams on the given path, using
    /// the provided interval on the read timeout as part of its main loop.
    pub fn new<P, C>(path: P, interval: Duration, consumer: C) -> Self
        P: AsRef<Path>,
        C: DatagramConsumer + Send + Sync + 'static,
        UnixSocketServer {
            ready: AtomicBool::new(false),
            shutdown: AtomicBool::new(false),
            path: path.as_ref().to_path_buf(),
            consumer: Arc::new(consumer),

    /// Has the server created the socket to listen on?
    pub fn is_ready(&self) -> bool {

    /// Run until the `.shutdown()` method is called, reading datagrams and discarding them.
    pub fn run(&self) -> io::Result<()> {
        // Make sure to remove any existing socket at the same path before we start
        // listening on it. Ignore any errors since it's entirely possible that the
        // socket file doesn't exist.
        let _ = fs::remove_file(&self.path);
        let socket = UnixDatagram::bind(&self.path)?;

        let mut buf = [0u8; 1024];, Ordering::Release);

        loop {
            match socket.recv(&mut buf) {
                Ok(v) => match std::str::from_utf8(&buf[0..v]) {
                    Ok(s) => self.consumer.accept(s.to_owned()),
                    Err(e) => eprintln!("Error: Couldn't decode string to utf-8 {}", e),
                Err(e) => {
                    // WouldBlock means we hit our receive timeout which is expected.
                    // If the "shutdown" flag has been set by the client they've sent
                    // all the metrics they are going to send and we can shutdown the
                    // server. Otherwise, just ignore the WouldBlock error.
                    if e.kind() == ErrorKind::WouldBlock {
                        if self.shutdown.load(Ordering::Acquire) {
                    } else {
                        // Some other kind of error besides hitting our receive timeout
                        eprintln!("Error: {} - {:?}", e, e.kind());


    /// Indicate that the server should stop its main run loop.
    pub fn shutdown(&self) {, Ordering::Release);

/// Wrapper around a `UnixSocketServer` to start and stop it in the course
/// of running a single test.
/// The server is stopped and the thread it was running in is joined from
/// the destructor of this struct.
pub struct UnixServerHarness {
    base: PathBuf,
    server: Option<Arc<UnixSocketServer>>,
    thread: Option<JoinHandle<()>>,

impl UnixServerHarness {
    pub fn new<P>(prefix: P) -> Self
        P: AsRef<Path>,
        UnixServerHarness {
            base: prefix.as_ref().to_path_buf(),
            server: None,
            thread: None,

    pub fn run<C, F>(mut self, consumer: C, body: F)
        C: DatagramConsumer + Send + Sync + 'static,
        F: FnOnce(&Path),
        let temp = TempDir::new(&self.base).unwrap();
        let socket = temp.new_path("cadence.sock");

        let server = Arc::new(UnixSocketServer::new(&socket, Duration::from_millis(100), consumer));
        let server_local = server.clone();

        let t = thread::spawn(move || {

        while !server.is_ready() {

        self.server = Some(server);
        self.thread = Some(t);


    pub fn run_quiet<F>(self, body: F)
        F: FnOnce(&Path),
    {|_| (), body)

impl Drop for UnixServerHarness {
    fn drop(&mut self) {
        if let Some(s) = self.server.take() {

        if let Some(t) = self.thread.take() {
            let _ = t.join();

struct Every {
    modulo: u64,
    counter: AtomicU64,

impl Every {
    fn new(modulo: u64) -> Self {
        assert_ne!(modulo, 0, "modulo must be >= 1");

        Every {
            counter: AtomicU64::new(1),

    fn allow(&self) -> bool {
        self.counter.fetch_add(1, Ordering::SeqCst) % self.modulo == 0

/// `MetricSink` implementation that can panic.
pub struct PanickingMetricSink {
    every: Every,

impl PanickingMetricSink {
    pub fn every(every: u64) -> Self {
        PanickingMetricSink {
            every: Every::new(every),

    pub fn always() -> Self {

impl MetricSink for PanickingMetricSink {
    fn emit(&self, m: &str) -> io::Result<usize> {
        if self.every.allow() {
            panic!("This sink is supposed to panic");
        } else {

/// `MetricSink` implementation that can return an error
pub struct ErrorMetricSink {
    every: Every,

impl ErrorMetricSink {
    pub fn every(every: u64) -> Self {
        ErrorMetricSink {
            every: Every::new(every),

    pub fn always() -> Self {

impl MetricSink for ErrorMetricSink {
    fn emit(&self, m: &str) -> io::Result<usize> {
        if self.every.allow() {
        } else {