Struct dipstick::AtomicBucket
source · pub struct AtomicBucket { /* private fields */ }
Expand description
Central aggregation structure. Maintains a list of metrics for enumeration when used as source.
Implementations§
source§impl AtomicBucket
impl AtomicBucket
sourcepub fn new() -> AtomicBucket
pub fn new() -> AtomicBucket
Build a new atomic stats.
Examples found in repository?
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
fn main() {
let bucket = AtomicBucket::new();
AtomicBucket::default_drain(Stream::write_to_stdout());
let persistent_marker = bucket.marker("persistent");
let mut i = 0;
loop {
i += 1;
let transient_marker = bucket.marker(&format!("marker_{}", i));
transient_marker.mark();
persistent_marker.mark();
bucket.flush().unwrap();
sleep(Duration::from_secs(1));
}
}
More examples
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
fn main() {
let bucket = AtomicBucket::new();
let event = bucket.marker("a");
let args = &mut args();
args.next();
let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
for _ in 0..tc {
let event = event.clone();
thread::spawn(move || {
loop {
// report some metric values from our "application" loop
event.mark();
}
});
}
sleep(Duration::from_secs(5));
bucket.stats(stats_all);
bucket
.flush_to(&Stream::write_to_stdout().metrics())
.unwrap();
}
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
fn main() {
let event = Proxy::default().marker("a");
let bucket = AtomicBucket::new();
Proxy::default().target(bucket.clone());
let args = &mut args();
args.next();
let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
for _ in 0..tc {
let event = event.clone();
thread::spawn(move || {
loop {
// report some metric values from our "application" loop
event.mark();
}
});
}
sleep(Duration::from_secs(5));
bucket
.flush_to(&Stream::write_to_stdout().metrics())
.unwrap();
}
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
fn main() {
let metrics = AtomicBucket::new().named("process");
metrics.drain(Stream::write_to_stdout());
metrics.flush_every(Duration::from_secs(3));
let uptime = metrics.gauge("uptime");
metrics.observe(uptime, |_| 6).on_flush();
// record number of threads in pool every second
let scheduled = metrics
.observe(metrics.gauge("threads"), thread_count)
.every(Duration::from_secs(1));
// "heartbeat" metric
let on_flush = metrics
.observe(metrics.marker("heartbeat"), |_| 1)
.on_flush();
for _ in 0..1000 {
std::thread::sleep(Duration::from_millis(40));
}
on_flush.cancel();
scheduled.cancel();
}
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
fn main() {
let metrics = AtomicBucket::new().named("test");
metrics.drain(Stream::write_to_stdout());
metrics.flush_every(Duration::from_secs(3));
let counter = metrics.counter("counter_a");
let timer = metrics.timer("timer_a");
let gauge = metrics.gauge("gauge_a");
let marker = metrics.marker("marker_a");
loop {
// add counts forever, non-stop
counter.count(11);
counter.count(12);
counter.count(13);
timer.interval_us(11_000_000);
timer.interval_us(12_000_000);
timer.interval_us(13_000_000);
gauge.value(11);
gauge.value(12);
gauge.value(13);
marker.mark();
}
}
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
fn main() {
let app_metrics = AtomicBucket::new();
app_metrics.drain(Stream::write_to_stdout());
app_metrics.flush_every(Duration::from_secs(3));
let counter = app_metrics.counter("counter_a");
let timer = app_metrics.timer("timer_a");
let gauge = app_metrics.gauge("gauge_a");
let marker = app_metrics.marker("marker_a");
loop {
// add counts forever, non-stop
counter.count(11);
counter.count(12);
counter.count(13);
timer.interval_us(11_000_000);
timer.interval_us(12_000_000);
timer.interval_us(13_000_000);
gauge.value(11);
gauge.value(12);
gauge.value(13);
marker.mark();
}
}
sourcepub fn default_stats<F>(func: F)where
F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static,
pub fn default_stats<F>(func: F)where
F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static,
Set the default aggregated metrics statistics generator.
Examples found in repository?
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
fn main() {
let one_minute = AtomicBucket::new();
one_minute.flush_every(Duration::from_secs(60));
let five_minutes = AtomicBucket::new();
five_minutes.flush_every(Duration::from_secs(300));
let fifteen_minutes = AtomicBucket::new();
fifteen_minutes.flush_every(Duration::from_secs(900));
let all_buckets = MultiInputScope::new()
.add_target(one_minute)
.add_target(five_minutes)
.add_target(fifteen_minutes)
.named("machine_name");
// send application metrics to aggregator
Proxy::default().target(all_buckets);
AtomicBucket::default_drain(Stream::write_to_stdout());
AtomicBucket::default_stats(stats_all);
loop {
COUNTER.count(17);
sleep(Duration::from_secs(3));
}
}
More examples
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
fn main() {
fn custom_statistics(
kind: InputKind,
mut name: MetricName,
score: ScoreType,
) -> Option<(InputKind, MetricName, MetricValue)> {
match (kind, score) {
// do not export gauge scores
(InputKind::Gauge, _) => None,
// prepend and append to metric name
(_, ScoreType::Count(count)) => {
if let Some(last) = name.pop_back() {
Some((
InputKind::Counter,
name.append("customized_add_prefix")
.append(format!("{}_and_a_suffix", last)),
count,
))
} else {
None
}
}
// scaling the score value and appending unit to name
(kind, ScoreType::Sum(sum)) => Some((kind, name.append("per_thousand"), sum / 1000)),
// using the unmodified metric name
(kind, ScoreType::Mean(avg)) => Some((kind, name, avg.round() as MetricValue)),
// do not export min and max
_ => None,
}
}
// send application metrics to aggregator
AtomicBucket::default_drain(Stream::write_to_stderr());
AtomicBucket::default_stats(custom_statistics);
let app_metrics = AtomicBucket::new();
// schedule aggregated metrics to be printed every 3 seconds
app_metrics.flush_every(Duration::from_secs(3));
let counter = app_metrics.counter("counter_a");
let timer = app_metrics.timer("timer_b");
let gauge = app_metrics.gauge("gauge_c");
loop {
counter.count(11);
timer.interval_us(654654);
gauge.value(3534);
}
}
sourcepub fn unset_default_stats()
pub fn unset_default_stats()
Revert the default aggregated metrics statistics generator to the default stats_summary
.
sourcepub fn default_drain(default_config: impl Input + 'static)
pub fn default_drain(default_config: impl Input + 'static)
Set the default stats aggregated metrics flush output.
Examples found in repository?
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
fn main() {
let bucket = AtomicBucket::new();
AtomicBucket::default_drain(Stream::write_to_stdout());
let persistent_marker = bucket.marker("persistent");
let mut i = 0;
loop {
i += 1;
let transient_marker = bucket.marker(&format!("marker_{}", i));
transient_marker.mark();
persistent_marker.mark();
bucket.flush().unwrap();
sleep(Duration::from_secs(1));
}
}
More examples
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
fn main() {
let one_minute = AtomicBucket::new();
one_minute.flush_every(Duration::from_secs(60));
let five_minutes = AtomicBucket::new();
five_minutes.flush_every(Duration::from_secs(300));
let fifteen_minutes = AtomicBucket::new();
fifteen_minutes.flush_every(Duration::from_secs(900));
let all_buckets = MultiInputScope::new()
.add_target(one_minute)
.add_target(five_minutes)
.add_target(fifteen_minutes)
.named("machine_name");
// send application metrics to aggregator
Proxy::default().target(all_buckets);
AtomicBucket::default_drain(Stream::write_to_stdout());
AtomicBucket::default_stats(stats_all);
loop {
COUNTER.count(17);
sleep(Duration::from_secs(3));
}
}
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
fn main() {
fn custom_statistics(
kind: InputKind,
mut name: MetricName,
score: ScoreType,
) -> Option<(InputKind, MetricName, MetricValue)> {
match (kind, score) {
// do not export gauge scores
(InputKind::Gauge, _) => None,
// prepend and append to metric name
(_, ScoreType::Count(count)) => {
if let Some(last) = name.pop_back() {
Some((
InputKind::Counter,
name.append("customized_add_prefix")
.append(format!("{}_and_a_suffix", last)),
count,
))
} else {
None
}
}
// scaling the score value and appending unit to name
(kind, ScoreType::Sum(sum)) => Some((kind, name.append("per_thousand"), sum / 1000)),
// using the unmodified metric name
(kind, ScoreType::Mean(avg)) => Some((kind, name, avg.round() as MetricValue)),
// do not export min and max
_ => None,
}
}
// send application metrics to aggregator
AtomicBucket::default_drain(Stream::write_to_stderr());
AtomicBucket::default_stats(custom_statistics);
let app_metrics = AtomicBucket::new();
// schedule aggregated metrics to be printed every 3 seconds
app_metrics.flush_every(Duration::from_secs(3));
let counter = app_metrics.counter("counter_a");
let timer = app_metrics.timer("timer_b");
let gauge = app_metrics.gauge("gauge_c");
loop {
counter.count(11);
timer.interval_us(654654);
gauge.value(3534);
}
}
sourcepub fn unset_default_drain()
pub fn unset_default_drain()
Revert the default stats aggregated metrics flush output.
sourcepub fn set_stats<F>(&self, func: F)where
F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static,
👎Deprecated since 0.7.2: Use stats()
pub fn set_stats<F>(&self, func: F)where
F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static,
Set this stats’s statistics generator.
sourcepub fn stats<F>(&self, func: F)where
F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static,
pub fn stats<F>(&self, func: F)where
F: Fn(InputKind, MetricName, ScoreType) -> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static,
Set this stats’s statistics generator.
Examples found in repository?
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
fn main() {
let bucket = AtomicBucket::new();
let event = bucket.marker("a");
let args = &mut args();
args.next();
let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
for _ in 0..tc {
let event = event.clone();
thread::spawn(move || {
loop {
// report some metric values from our "application" loop
event.mark();
}
});
}
sleep(Duration::from_secs(5));
bucket.stats(stats_all);
bucket
.flush_to(&Stream::write_to_stdout().metrics())
.unwrap();
}
sourcepub fn unset_stats(&self)
pub fn unset_stats(&self)
Revert this stats’s statistics generator to the default stats.
sourcepub fn set_drain(&self, new_drain: impl Input + 'static)
👎Deprecated since 0.7.2: Use drain()
pub fn set_drain(&self, new_drain: impl Input + 'static)
Set this stats’s aggregated metrics flush output.
sourcepub fn drain(&self, new_drain: impl Input + 'static)
pub fn drain(&self, new_drain: impl Input + 'static)
Set this stats’s aggregated metrics flush output.
Examples found in repository?
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
fn main() {
let metrics = AtomicBucket::new().named("process");
metrics.drain(Stream::write_to_stdout());
metrics.flush_every(Duration::from_secs(3));
let uptime = metrics.gauge("uptime");
metrics.observe(uptime, |_| 6).on_flush();
// record number of threads in pool every second
let scheduled = metrics
.observe(metrics.gauge("threads"), thread_count)
.every(Duration::from_secs(1));
// "heartbeat" metric
let on_flush = metrics
.observe(metrics.marker("heartbeat"), |_| 1)
.on_flush();
for _ in 0..1000 {
std::thread::sleep(Duration::from_millis(40));
}
on_flush.cancel();
scheduled.cancel();
}
More examples
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
fn main() {
let metrics = AtomicBucket::new().named("test");
metrics.drain(Stream::write_to_stdout());
metrics.flush_every(Duration::from_secs(3));
let counter = metrics.counter("counter_a");
let timer = metrics.timer("timer_a");
let gauge = metrics.gauge("gauge_a");
let marker = metrics.marker("marker_a");
loop {
// add counts forever, non-stop
counter.count(11);
counter.count(12);
counter.count(13);
timer.interval_us(11_000_000);
timer.interval_us(12_000_000);
timer.interval_us(13_000_000);
gauge.value(11);
gauge.value(12);
gauge.value(13);
marker.mark();
}
}
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
fn main() {
let app_metrics = AtomicBucket::new();
app_metrics.drain(Stream::write_to_stdout());
app_metrics.flush_every(Duration::from_secs(3));
let counter = app_metrics.counter("counter_a");
let timer = app_metrics.timer("timer_a");
let gauge = app_metrics.gauge("gauge_a");
let marker = app_metrics.marker("marker_a");
loop {
// add counts forever, non-stop
counter.count(11);
counter.count(12);
counter.count(13);
timer.interval_us(11_000_000);
timer.interval_us(12_000_000);
timer.interval_us(13_000_000);
gauge.value(11);
gauge.value(12);
gauge.value(13);
marker.mark();
}
}
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
fn main() {
// adding a name to the stats
let bucket = AtomicBucket::new().named("test");
// adding two names to Graphite output
// metrics will be prefixed with "machine1.application.test"
bucket.drain(
Graphite::send_to("localhost:2003")
.expect("Socket")
.named("machine1")
.add_name("application"),
);
bucket.flush_every(Duration::from_secs(3));
let counter = bucket.counter("counter_a");
let timer = bucket.timer("timer_a");
let gauge = bucket.gauge("gauge_a");
let marker = bucket.marker("marker_a");
loop {
// add counts forever, non-stop
counter.count(11);
counter.count(12);
counter.count(13);
timer.interval_us(11_000_000);
timer.interval_us(12_000_000);
timer.interval_us(13_000_000);
gauge.value(11);
gauge.value(12);
gauge.value(13);
marker.mark();
}
}
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
fn main() {
// Placeholder to collect output targets
// This will prefix all metrics with "my_stats"
// before flushing them.
let mut targets = MultiInput::new().named("my_stats");
// Skip the metrics here... we just use this for the output
// Follow the same pattern for Statsd, Graphite, etc.
let prometheus = Prometheus::push_to("http://localhost:9091/metrics/job/dipstick_example")
.expect("Prometheus Socket");
targets = targets.add_target(prometheus);
// Add stdout
targets = targets.add_target(Stream::write_to_stdout());
// Create the stats and drain targets
let bucket = AtomicBucket::new();
bucket.drain(targets);
// Crucial, set the flush interval, otherwise risk hammering targets
bucket.flush_every(Duration::from_secs(3));
// Now wire up the proxy target with the stats and you're all set
let proxy = Proxy::default();
proxy.target(bucket.clone());
// Example using the macro! Proxy sugar
PROXY.target(bucket.named("global"));
loop {
// Using the default proxy
proxy.counter("beans").count(100);
proxy.timer("braincells").interval_us(420);
// global example
PROXY.counter("my_proxy_counter").count(123);
PROXY.timer("my_proxy_timer").interval_us(2000000);
std::thread::sleep(Duration::from_millis(100));
}
}
sourcepub fn unset_drain(&self)
pub fn unset_drain(&self)
Revert this stats’s flush target to the default output.
sourcepub fn flush_to(&self, publish_scope: &dyn InputScope) -> Result<()>
pub fn flush_to(&self, publish_scope: &dyn InputScope) -> Result<()>
Immediately flush the stats’s metrics to the specified scope and stats.
Examples found in repository?
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
fn main() {
let bucket = AtomicBucket::new();
let event = bucket.marker("a");
let args = &mut args();
args.next();
let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
for _ in 0..tc {
let event = event.clone();
thread::spawn(move || {
loop {
// report some metric values from our "application" loop
event.mark();
}
});
}
sleep(Duration::from_secs(5));
bucket.stats(stats_all);
bucket
.flush_to(&Stream::write_to_stdout().metrics())
.unwrap();
}
More examples
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
fn main() {
let event = Proxy::default().marker("a");
let bucket = AtomicBucket::new();
Proxy::default().target(bucket.clone());
let args = &mut args();
args.next();
let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
for _ in 0..tc {
let event = event.clone();
thread::spawn(move || {
loop {
// report some metric values from our "application" loop
event.mark();
}
});
}
sleep(Duration::from_secs(5));
bucket
.flush_to(&Stream::write_to_stdout().metrics())
.unwrap();
}
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
fn main() {
let bucket = AtomicBucket::new();
// NOTE: Wrapping an AtomicBucket with a Queue probably useless, as it is very fast and performs no I/O.
let queue = InputQueueScope::wrap(bucket.clone(), 10000);
let event = queue.marker("a");
let args = &mut args();
args.next();
let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
for _ in 0..tc {
let event = event.clone();
thread::spawn(move || {
loop {
// report some metric values from our "application" loop
event.mark();
}
});
}
sleep(Duration::from_secs(5));
bucket
.flush_to(&Stream::write_to_stdout().metrics())
.unwrap();
}
Trait Implementations§
source§impl Clone for AtomicBucket
impl Clone for AtomicBucket
source§fn clone(&self) -> AtomicBucket
fn clone(&self) -> AtomicBucket
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresource§impl Debug for AtomicBucket
impl Debug for AtomicBucket
source§impl Default for AtomicBucket
impl Default for AtomicBucket
source§fn default() -> AtomicBucket
fn default() -> AtomicBucket
source§impl Flush for AtomicBucket
impl Flush for AtomicBucket
source§impl<S: AsRef<str>> From<S> for AtomicBucket
impl<S: AsRef<str>> From<S> for AtomicBucket
source§fn from(name: S) -> AtomicBucket
fn from(name: S) -> AtomicBucket
source§impl InputScope for AtomicBucket
impl InputScope for AtomicBucket
source§fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric
fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric
Lookup or create scores for the requested metric.
source§impl WithAttributes for AtomicBucket
impl WithAttributes for AtomicBucket
source§fn get_attributes(&self) -> &Attributes
fn get_attributes(&self) -> &Attributes
source§fn mut_attributes(&mut self) -> &mut Attributes
fn mut_attributes(&mut self) -> &mut Attributes
source§fn with_attributes<F: Fn(&mut Attributes)>(&self, edit: F) -> Self
fn with_attributes<F: Fn(&mut Attributes)>(&self, edit: F) -> Self
Auto Trait Implementations§
impl Freeze for AtomicBucket
impl !RefUnwindSafe for AtomicBucket
impl Send for AtomicBucket
impl Sync for AtomicBucket
impl Unpin for AtomicBucket
impl !UnwindSafe for AtomicBucket
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> Observe for Twhere
T: InputScope + WithAttributes,
impl<T> Observe for Twhere
T: InputScope + WithAttributes,
§type Inner = T
type Inner = T
ObserveWhen
. Read moresource§impl<T> OnFlush for Twhere
T: Flush + WithAttributes,
impl<T> OnFlush for Twhere
T: Flush + WithAttributes,
source§fn notify_flush_listeners(&self)
fn notify_flush_listeners(&self)
source§impl<T> Prefixed for Twhere
T: WithAttributes,
impl<T> Prefixed for Twhere
T: WithAttributes,
source§fn get_prefixes(&self) -> &NameParts
fn get_prefixes(&self) -> &NameParts
Returns namespace of component.
source§fn add_prefix<S>(&self, name: S) -> T
👎Deprecated since 0.7.2: Use named() or add_name()
fn add_prefix<S>(&self, name: S) -> T
Append a name to the existing names. Return a clone of the component with the updated names.
source§fn add_name<S>(&self, name: S) -> T
fn add_name<S>(&self, name: S) -> T
Append a name to the existing names. Return a clone of the component with the updated names.
source§fn named<S>(&self, name: S) -> T
fn named<S>(&self, name: S) -> T
Replace any existing names with a single name.
Return a clone of the component with the new name.
If multiple names are required, add_name
may also be used.
source§fn prefix_append<S: Into<MetricName>>(&self, name: S) -> MetricName
fn prefix_append<S: Into<MetricName>>(&self, name: S) -> MetricName
source§fn prefix_prepend<S: Into<MetricName>>(&self, name: S) -> MetricName
fn prefix_prepend<S: Into<MetricName>>(&self, name: S) -> MetricName
source§impl<T> ScheduleFlush for T
impl<T> ScheduleFlush for T
source§fn flush_every(&self, period: Duration) -> CancelHandle
fn flush_every(&self, period: Duration) -> CancelHandle
Flush this scope at regular intervals.