Struct dipstick::AtomicBucket

source ·
pub struct AtomicBucket { /* private fields */ }
Expand description

Central aggregation structure. Maintains a list of metrics for enumeration when used as source.

Implementations§

Build a new atomic stats.

Examples found in repository?
examples/bucket_cleanup.rs (line 9)
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
fn main() {
    let bucket = AtomicBucket::new();
    AtomicBucket::default_drain(Stream::write_to_stdout());

    let persistent_marker = bucket.marker("persistent");

    let mut i = 0;

    loop {
        i += 1;
        let transient_marker = bucket.marker(&format!("marker_{}", i));

        transient_marker.mark();
        persistent_marker.mark();

        bucket.flush().unwrap();

        sleep(Duration::from_secs(1));
    }
}
More examples
Hide additional examples
examples/bench_bucket.rs (line 11)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fn main() {
    let bucket = AtomicBucket::new();
    let event = bucket.marker("a");
    let args = &mut args();
    args.next();
    let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
    for _ in 0..tc {
        let event = event.clone();
        thread::spawn(move || {
            loop {
                // report some metric values from our "application" loop
                event.mark();
            }
        });
    }
    sleep(Duration::from_secs(5));
    bucket.stats(stats_all);
    bucket
        .flush_to(&Stream::write_to_stdout().metrics())
        .unwrap();
}
examples/bench_bucket_proxy.rs (line 13)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
fn main() {
    let event = Proxy::default().marker("a");

    let bucket = AtomicBucket::new();

    Proxy::default().target(bucket.clone());

    let args = &mut args();
    args.next();
    let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
    for _ in 0..tc {
        let event = event.clone();
        thread::spawn(move || {
            loop {
                // report some metric values from our "application" loop
                event.mark();
            }
        });
    }
    sleep(Duration::from_secs(5));
    bucket
        .flush_to(&Stream::write_to_stdout().metrics())
        .unwrap();
}
examples/observer.rs (line 21)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
fn main() {
    let metrics = AtomicBucket::new().named("process");
    metrics.drain(Stream::write_to_stdout());
    metrics.flush_every(Duration::from_secs(3));

    let uptime = metrics.gauge("uptime");
    metrics.observe(uptime, |_| 6).on_flush();

    // record number of threads in pool every second
    let scheduled = metrics
        .observe(metrics.gauge("threads"), thread_count)
        .every(Duration::from_secs(1));

    // "heartbeat" metric
    let on_flush = metrics
        .observe(metrics.marker("heartbeat"), |_| 1)
        .on_flush();

    for _ in 0..1000 {
        std::thread::sleep(Duration::from_millis(40));
    }

    on_flush.cancel();
    scheduled.cancel();
}
examples/bucket2stdout.rs (line 8)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
fn main() {
    let metrics = AtomicBucket::new().named("test");

    metrics.drain(Stream::write_to_stdout());

    metrics.flush_every(Duration::from_secs(3));

    let counter = metrics.counter("counter_a");
    let timer = metrics.timer("timer_a");
    let gauge = metrics.gauge("gauge_a");
    let marker = metrics.marker("marker_a");

    loop {
        // add counts forever, non-stop
        counter.count(11);
        counter.count(12);
        counter.count(13);

        timer.interval_us(11_000_000);
        timer.interval_us(12_000_000);
        timer.interval_us(13_000_000);

        gauge.value(11);
        gauge.value(12);
        gauge.value(13);

        marker.mark();
    }
}
examples/bucket_summary.rs (line 8)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fn main() {
    let app_metrics = AtomicBucket::new();
    app_metrics.drain(Stream::write_to_stdout());

    app_metrics.flush_every(Duration::from_secs(3));

    let counter = app_metrics.counter("counter_a");
    let timer = app_metrics.timer("timer_a");
    let gauge = app_metrics.gauge("gauge_a");
    let marker = app_metrics.marker("marker_a");

    loop {
        // add counts forever, non-stop
        counter.count(11);
        counter.count(12);
        counter.count(13);

        timer.interval_us(11_000_000);
        timer.interval_us(12_000_000);
        timer.interval_us(13_000_000);

        gauge.value(11);
        gauge.value(12);
        gauge.value(13);

        marker.mark();
    }
}

Set the default aggregated metrics statistics generator.

Examples found in repository?
examples/clopwizard.rs (line 33)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
fn main() {
    let one_minute = AtomicBucket::new();
    one_minute.flush_every(Duration::from_secs(60));

    let five_minutes = AtomicBucket::new();
    five_minutes.flush_every(Duration::from_secs(300));

    let fifteen_minutes = AtomicBucket::new();
    fifteen_minutes.flush_every(Duration::from_secs(900));

    let all_buckets = MultiInputScope::new()
        .add_target(one_minute)
        .add_target(five_minutes)
        .add_target(fifteen_minutes)
        .named("machine_name");

    // send application metrics to aggregator
    Proxy::default().target(all_buckets);
    AtomicBucket::default_drain(Stream::write_to_stdout());
    AtomicBucket::default_stats(stats_all);

    loop {
        COUNTER.count(17);
        sleep(Duration::from_secs(3));
    }
}
More examples
Hide additional examples
examples/custom_publish.rs (line 44)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
fn main() {
    fn custom_statistics(
        kind: InputKind,
        mut name: MetricName,
        score: ScoreType,
    ) -> Option<(InputKind, MetricName, MetricValue)> {
        match (kind, score) {
            // do not export gauge scores
            (InputKind::Gauge, _) => None,

            // prepend and append to metric name
            (_, ScoreType::Count(count)) => {
                if let Some(last) = name.pop_back() {
                    Some((
                        InputKind::Counter,
                        name.append("customized_add_prefix")
                            .append(format!("{}_and_a_suffix", last)),
                        count,
                    ))
                } else {
                    None
                }
            }

            // scaling the score value and appending unit to name
            (kind, ScoreType::Sum(sum)) => Some((kind, name.append("per_thousand"), sum / 1000)),

            // using the unmodified metric name
            (kind, ScoreType::Mean(avg)) => Some((kind, name, avg.round() as MetricValue)),

            // do not export min and max
            _ => None,
        }
    }

    // send application metrics to aggregator
    AtomicBucket::default_drain(Stream::write_to_stderr());
    AtomicBucket::default_stats(custom_statistics);

    let app_metrics = AtomicBucket::new();

    // schedule aggregated metrics to be printed every 3 seconds
    app_metrics.flush_every(Duration::from_secs(3));

    let counter = app_metrics.counter("counter_a");
    let timer = app_metrics.timer("timer_b");
    let gauge = app_metrics.gauge("gauge_c");
    loop {
        counter.count(11);
        timer.interval_us(654654);
        gauge.value(3534);
    }
}

Revert the default aggregated metrics statistics generator to the default stats_summary.

Set the default stats aggregated metrics flush output.

Examples found in repository?
examples/bucket_cleanup.rs (line 10)
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
fn main() {
    let bucket = AtomicBucket::new();
    AtomicBucket::default_drain(Stream::write_to_stdout());

    let persistent_marker = bucket.marker("persistent");

    let mut i = 0;

    loop {
        i += 1;
        let transient_marker = bucket.marker(&format!("marker_{}", i));

        transient_marker.mark();
        persistent_marker.mark();

        bucket.flush().unwrap();

        sleep(Duration::from_secs(1));
    }
}
More examples
Hide additional examples
examples/clopwizard.rs (line 32)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
fn main() {
    let one_minute = AtomicBucket::new();
    one_minute.flush_every(Duration::from_secs(60));

    let five_minutes = AtomicBucket::new();
    five_minutes.flush_every(Duration::from_secs(300));

    let fifteen_minutes = AtomicBucket::new();
    fifteen_minutes.flush_every(Duration::from_secs(900));

    let all_buckets = MultiInputScope::new()
        .add_target(one_minute)
        .add_target(five_minutes)
        .add_target(fifteen_minutes)
        .named("machine_name");

    // send application metrics to aggregator
    Proxy::default().target(all_buckets);
    AtomicBucket::default_drain(Stream::write_to_stdout());
    AtomicBucket::default_stats(stats_all);

    loop {
        COUNTER.count(17);
        sleep(Duration::from_secs(3));
    }
}
examples/custom_publish.rs (line 43)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
fn main() {
    fn custom_statistics(
        kind: InputKind,
        mut name: MetricName,
        score: ScoreType,
    ) -> Option<(InputKind, MetricName, MetricValue)> {
        match (kind, score) {
            // do not export gauge scores
            (InputKind::Gauge, _) => None,

            // prepend and append to metric name
            (_, ScoreType::Count(count)) => {
                if let Some(last) = name.pop_back() {
                    Some((
                        InputKind::Counter,
                        name.append("customized_add_prefix")
                            .append(format!("{}_and_a_suffix", last)),
                        count,
                    ))
                } else {
                    None
                }
            }

            // scaling the score value and appending unit to name
            (kind, ScoreType::Sum(sum)) => Some((kind, name.append("per_thousand"), sum / 1000)),

            // using the unmodified metric name
            (kind, ScoreType::Mean(avg)) => Some((kind, name, avg.round() as MetricValue)),

            // do not export min and max
            _ => None,
        }
    }

    // send application metrics to aggregator
    AtomicBucket::default_drain(Stream::write_to_stderr());
    AtomicBucket::default_stats(custom_statistics);

    let app_metrics = AtomicBucket::new();

    // schedule aggregated metrics to be printed every 3 seconds
    app_metrics.flush_every(Duration::from_secs(3));

    let counter = app_metrics.counter("counter_a");
    let timer = app_metrics.timer("timer_b");
    let gauge = app_metrics.gauge("gauge_c");
    loop {
        counter.count(11);
        timer.interval_us(654654);
        gauge.value(3534);
    }
}

Revert the default stats aggregated metrics flush output.

👎Deprecated since 0.7.2: Use stats()

Set this stats’s statistics generator.

Set this stats’s statistics generator.

Examples found in repository?
examples/bench_bucket.rs (line 26)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fn main() {
    let bucket = AtomicBucket::new();
    let event = bucket.marker("a");
    let args = &mut args();
    args.next();
    let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
    for _ in 0..tc {
        let event = event.clone();
        thread::spawn(move || {
            loop {
                // report some metric values from our "application" loop
                event.mark();
            }
        });
    }
    sleep(Duration::from_secs(5));
    bucket.stats(stats_all);
    bucket
        .flush_to(&Stream::write_to_stdout().metrics())
        .unwrap();
}

Revert this stats’s statistics generator to the default stats.

👎Deprecated since 0.7.2: Use drain()

Set this stats’s aggregated metrics flush output.

Set this stats’s aggregated metrics flush output.

Examples found in repository?
examples/observer.rs (line 22)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
fn main() {
    let metrics = AtomicBucket::new().named("process");
    metrics.drain(Stream::write_to_stdout());
    metrics.flush_every(Duration::from_secs(3));

    let uptime = metrics.gauge("uptime");
    metrics.observe(uptime, |_| 6).on_flush();

    // record number of threads in pool every second
    let scheduled = metrics
        .observe(metrics.gauge("threads"), thread_count)
        .every(Duration::from_secs(1));

    // "heartbeat" metric
    let on_flush = metrics
        .observe(metrics.marker("heartbeat"), |_| 1)
        .on_flush();

    for _ in 0..1000 {
        std::thread::sleep(Duration::from_millis(40));
    }

    on_flush.cancel();
    scheduled.cancel();
}
More examples
Hide additional examples
examples/bucket2stdout.rs (line 10)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
fn main() {
    let metrics = AtomicBucket::new().named("test");

    metrics.drain(Stream::write_to_stdout());

    metrics.flush_every(Duration::from_secs(3));

    let counter = metrics.counter("counter_a");
    let timer = metrics.timer("timer_a");
    let gauge = metrics.gauge("gauge_a");
    let marker = metrics.marker("marker_a");

    loop {
        // add counts forever, non-stop
        counter.count(11);
        counter.count(12);
        counter.count(13);

        timer.interval_us(11_000_000);
        timer.interval_us(12_000_000);
        timer.interval_us(13_000_000);

        gauge.value(11);
        gauge.value(12);
        gauge.value(13);

        marker.mark();
    }
}
examples/bucket_summary.rs (line 9)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fn main() {
    let app_metrics = AtomicBucket::new();
    app_metrics.drain(Stream::write_to_stdout());

    app_metrics.flush_every(Duration::from_secs(3));

    let counter = app_metrics.counter("counter_a");
    let timer = app_metrics.timer("timer_a");
    let gauge = app_metrics.gauge("gauge_a");
    let marker = app_metrics.marker("marker_a");

    loop {
        // add counts forever, non-stop
        counter.count(11);
        counter.count(12);
        counter.count(13);

        timer.interval_us(11_000_000);
        timer.interval_us(12_000_000);
        timer.interval_us(13_000_000);

        gauge.value(11);
        gauge.value(12);
        gauge.value(13);

        marker.mark();
    }
}
examples/bucket2graphite.rs (lines 13-18)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
fn main() {
    // adding a name to the stats
    let bucket = AtomicBucket::new().named("test");

    // adding two names to Graphite output
    // metrics will be prefixed with "machine1.application.test"
    bucket.drain(
        Graphite::send_to("localhost:2003")
            .expect("Socket")
            .named("machine1")
            .add_name("application"),
    );

    bucket.flush_every(Duration::from_secs(3));

    let counter = bucket.counter("counter_a");
    let timer = bucket.timer("timer_a");
    let gauge = bucket.gauge("gauge_a");
    let marker = bucket.marker("marker_a");

    loop {
        // add counts forever, non-stop
        counter.count(11);
        counter.count(12);
        counter.count(13);

        timer.interval_us(11_000_000);
        timer.interval_us(12_000_000);
        timer.interval_us(13_000_000);

        gauge.value(11);
        gauge.value(12);
        gauge.value(13);

        marker.mark();
    }
}
examples/proxy_multi.rs (line 37)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
fn main() {
    // Placeholder to collect output targets
    // This will prefix all metrics with "my_stats"
    // before flushing them.
    let mut targets = MultiInput::new().named("my_stats");

    // Skip the metrics here... we just use this for the output
    // Follow the same pattern for Statsd, Graphite, etc.
    let prometheus = Prometheus::push_to("http://localhost:9091/metrics/job/dipstick_example")
        .expect("Prometheus Socket");
    targets = targets.add_target(prometheus);

    // Add stdout
    targets = targets.add_target(Stream::write_to_stdout());

    // Create the stats and drain targets
    let bucket = AtomicBucket::new();
    bucket.drain(targets);
    // Crucial, set the flush interval, otherwise risk hammering targets
    bucket.flush_every(Duration::from_secs(3));

    // Now wire up the proxy target with the stats and you're all set
    let proxy = Proxy::default();
    proxy.target(bucket.clone());

    // Example using the macro! Proxy sugar
    PROXY.target(bucket.named("global"));

    loop {
        // Using the default proxy
        proxy.counter("beans").count(100);
        proxy.timer("braincells").interval_us(420);
        // global example
        PROXY.counter("my_proxy_counter").count(123);
        PROXY.timer("my_proxy_timer").interval_us(2000000);
        std::thread::sleep(Duration::from_millis(100));
    }
}

Revert this stats’s flush target to the default output.

Immediately flush the stats’s metrics to the specified scope and stats.

Examples found in repository?
examples/bench_bucket.rs (line 28)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fn main() {
    let bucket = AtomicBucket::new();
    let event = bucket.marker("a");
    let args = &mut args();
    args.next();
    let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
    for _ in 0..tc {
        let event = event.clone();
        thread::spawn(move || {
            loop {
                // report some metric values from our "application" loop
                event.mark();
            }
        });
    }
    sleep(Duration::from_secs(5));
    bucket.stats(stats_all);
    bucket
        .flush_to(&Stream::write_to_stdout().metrics())
        .unwrap();
}
More examples
Hide additional examples
examples/bench_bucket_proxy.rs (line 31)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
fn main() {
    let event = Proxy::default().marker("a");

    let bucket = AtomicBucket::new();

    Proxy::default().target(bucket.clone());

    let args = &mut args();
    args.next();
    let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
    for _ in 0..tc {
        let event = event.clone();
        thread::spawn(move || {
            loop {
                // report some metric values from our "application" loop
                event.mark();
            }
        });
    }
    sleep(Duration::from_secs(5));
    bucket
        .flush_to(&Stream::write_to_stdout().metrics())
        .unwrap();
}
examples/bench_queue.rs (line 29)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fn main() {
    let bucket = AtomicBucket::new();
    // NOTE: Wrapping an AtomicBucket with a Queue probably useless, as it is very fast and performs no I/O.
    let queue = InputQueueScope::wrap(bucket.clone(), 10000);
    let event = queue.marker("a");
    let args = &mut args();
    args.next();
    let tc: u8 = u8::from_str(&args.next().unwrap()).unwrap();
    for _ in 0..tc {
        let event = event.clone();
        thread::spawn(move || {
            loop {
                // report some metric values from our "application" loop
                event.mark();
            }
        });
    }
    sleep(Duration::from_secs(5));
    bucket
        .flush_to(&Stream::write_to_stdout().metrics())
        .unwrap();
}

Trait Implementations§

Returns a copy of the value. Read more
Performs copy-assignment from source. Read more
Formats the value using the given formatter. Read more
Returns the “default value” for a type. Read more

Collect and reset aggregated data. Publish statistics

Converts to this type from the input type.

Lookup or create scores for the requested metric.

Define a Counter.
Define a Marker.
Define a Timer.
Define a Gauge.
Define a Level.
Return attributes of component.
Return attributes of component for mutation.
Clone the component and mutate its attributes at once.

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

The resulting type after obtaining ownership.
Creates owned data from borrowed data, usually by cloning. Read more
Uses borrowed data to replace owned data, usually by cloning. Read more
The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.