Struct cadence::QueuingMetricSink

source ·
pub struct QueuingMetricSink { /* private fields */ }
Expand description

Implementation of a MetricSink that wraps another implementation and uses it to emit metrics asynchronously, in another thread.

Metrics submitted to this sink are queued and sent to the wrapped sink that is running in a separate thread. The wrapped implementation can be any thread (Sync + Send) and panic (RefUnwindSafe) safe MetricSink. Results from the wrapped implementation will be discarded.

The thread used for network operations (actually sending the metrics using the wrapped sink) is created and started when the QueuingMetricSink is created. The dequeuing of metrics is stopped and the thread stopped when QueuingMetricSink instance is destroyed (when .drop() is called).

This sink may be created with either a bounded or unbounded queue connecting the sink to the thread performing network operations. When an unbounded queue is used, entries submitted to the sink will always be accepted and queued until they can be drained by the network operation thread. This means that if the network thread cannot drain entries off the queue for some reason, it will grow without bound. Alternatively, if created with a bounded queue, entries submitted to the sink will not be accepted if the queue is full. This means that the network thread must be able to keep up with the rate of entries submit to the queue or writes to this sink will begin to fail.

Entries already queued are guaranteed to be sent to the wrapped sink before the queuing sink is stopped. Meaning, the following code ends up calling wrapped.emit(metric) on every metric submitted to the queuing sink.

§Example

use cadence::{MetricSink, QueuingMetricSink, NopMetricSink};

let wrapped = NopMetricSink;
{
    let queuing = QueuingMetricSink::from(wrapped);
    queuing.emit("foo.counter:4|c");
    queuing.emit("bar.counter:5|c");
    queuing.emit("baz.gauge:6|g");
}

At the end of this code block, all metrics are guaranteed to be sent to the underlying wrapped metric sink before the thread used by the queuing sink is stopped.

Implementations§

source§

impl QueuingMetricSink

source

pub fn builder() -> QueuingMetricSinkBuilder

Construct a new builder for QueuingMetricSink.

source

pub fn from<T>(sink: T) -> Self
where T: MetricSink + Sync + Send + RefUnwindSafe + 'static,

Construct a new QueuingMetricSink instance wrapping another sink implementation with an unbounded queue connecting them.

The .emit() method of the wrapped sink will be executed in a different thread after being passed to it via a queue. The wrapped sink should be thread safe (Send + Sync) and panic safe (RefUnwindSafe).

The thread in which the wrapped sink runs is created when the QueuingMetricSink is created and stopped when the queuing sink is destroyed.

The queuing sink communicates with the wrapped sink by an unbounded queue. If entries cannot be drained from the queue for some reason, it will grow without bound.

§Buffered UDP Sink Example

In this example we wrap a buffered UDP sink to execute it in a different thread.

use std::net::UdpSocket;
use cadence::{BufferedUdpMetricSink, QueuingMetricSink, DEFAULT_PORT};

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let host = ("metrics.example.com", DEFAULT_PORT);
let udp_sink = BufferedUdpMetricSink::from(host, socket).unwrap();
let queuing_sink = QueuingMetricSink::from(udp_sink);
Examples found in repository?
examples/production-sink.rs (line 23)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
fn main() {
    let sock = UdpSocket::bind("0.0.0.0:0").unwrap();
    let buffered = BufferedUdpMetricSink::from(("localhost", DEFAULT_PORT), sock).unwrap();
    let queued = QueuingMetricSink::from(buffered);
    let client = StatsdClient::from_sink("example.prefix", queued);

    client.count("example.counter", 1).unwrap();
    client.gauge("example.gauge", 5).unwrap();
    client.gauge("example.gauge", 5.0).unwrap();
    client.time("example.timer", 32).unwrap();
    client.time("example.timer", Duration::from_millis(32)).unwrap();
    client.histogram("example.histogram", 22).unwrap();
    client.histogram("example.histogram", Duration::from_nanos(22)).unwrap();
    client.histogram("example.histogram", 22.0).unwrap();
    client.distribution("example.distribution", 33).unwrap();
    client.distribution("example.distribution", 33.0).unwrap();
    client.meter("example.meter", 8).unwrap();
    client.set("example.set", 44).unwrap();
}
source

pub fn with_capacity<T>(sink: T, capacity: usize) -> Self
where T: MetricSink + Sync + Send + RefUnwindSafe + 'static,

Construct a new QueuingMetricSink instance wrapping another sink implementation with a queue of the given size connecting them.

The .emit() method of the wrapped sink will be executed in a different thread after being passed to it via a queue. The wrapped sink should be thread safe (Send + Sync) and panic safe (RefUnwindSafe).

The thread in which the wrapped sink runs is created when the QueuingMetricSink is created and stopped when the queuing sink is destroyed.

The queuing sink communicates with the wrapped sink by a bounded queue of the provided size. When the queue is full, writes to this sink will fail until the queue is drained.

§Buffered UDP Sink Example

In this example we wrap a buffered UDP sink to execute it in a different thread.

use std::net::UdpSocket;
use cadence::{BufferedUdpMetricSink, QueuingMetricSink, DEFAULT_PORT};

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let host = ("metrics.example.com", DEFAULT_PORT);
let udp_sink = BufferedUdpMetricSink::from(host, socket).unwrap();
let queuing_sink = QueuingMetricSink::with_capacity(udp_sink, 512 * 1024);
source

pub fn panics(&self) -> u64

Return the number of times the wrapped sink or underlying worker thread has panicked and needed to be restarted. In typical use this should always be 0 but may be > 0 for buggy MetricSink implementations.

source

pub fn queued(&self) -> u64

Return the number of currently queued metrics. Note that due to the way this number is computed (submitted metrics - processed metrics), it is necessarily approximate.

source

pub fn submitted(&self) -> u64

Return the number of metrics successfully submitted to this sink.

source

pub fn drained(&self) -> u64

Return the number of metrics removed from the queue to be processed by the wrapped sink. Note that this does not indicate that the metric has been successfully sent to a backend, only that it has been passed to the wrapped sink.

Trait Implementations§

source§

impl Clone for QueuingMetricSink

source§

fn clone(&self) -> QueuingMetricSink

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Debug for QueuingMetricSink

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl Drop for QueuingMetricSink

source§

fn drop(&mut self)

Send the worker a signal to stop processing metrics.

Note that this destructor only sends the worker thread a signal to stop, it doesn’t wait for it to stop.

source§

impl MetricSink for QueuingMetricSink

source§

fn emit(&self, metric: &str) -> Result<usize>

Send the Statsd metric using this sink and return the number of bytes written or an I/O error. Read more
source§

fn flush(&self) -> Result<(), Error>

Flush any currently buffered metrics to the underlying backend, returning an I/O error if they could not be written for some reason. Read more
source§

fn stats(&self) -> SinkStats

Return I/O telemetry like bytes / packets sent or dropped. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.