dbs_utils/
metric.rs

1// Copyright (C) 2022 Alibaba Cloud. All rights reserved.
2// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3// SPDX-License-Identifier: Apache-2.0
4
5//! Defines the public components of the metric system.
6//!
7//! # Design
8//! The main design goals of this system are:
9//! * Use lockless operations, preferably ones that don't require anything other than
10//!   simple reads/writes being atomic.
11//! * Exploit interior mutability and atomics being Sync to allow all methods (including the ones
12//!   which are effectively mutable) to be callable on a global non-mut static.
13//! * Rely on `serde` to provide the actual serialization for writing the metrics.
14//! * Since all metrics start at 0, we implement the `Default` trait via derive for all of them,
15//!   to avoid having to initialize everything by hand.
16//!
17//! The system implements 2 types of metrics:
18//! * Shared Incremental Metrics (SharedIncMetrics) - dedicated for the metrics which need a counter
19//! (i.e the number of times an API request failed). These metrics are reset upon flush.
20//! * Shared Store Metrics (SharedStoreMetrics) - are targeted at keeping a persistent value, it is not
21//! intended to act as a counter (i.e for measure the process start up time for example).
22//!
23//! The current approach for the `SharedIncMetrics` type is to store two values (current and previous)
24//! and compute the delta between them each time we do a flush (i.e by serialization). There are a number of advantages
25//! to this approach, including:
26//! * We don't have to introduce an additional write (to reset the value) from the thread which
27//!   does to actual writing, so less synchronization effort is required.
28//! * We don't have to worry at all that much about losing some data if writing fails for a while
29//!   (this could be a concern, I guess).
30//! If if turns out this approach is not really what we want, it's pretty easy to resort to
31//! something else, while working behind the same interface.
32
33use std::sync::atomic::{AtomicUsize, Ordering};
34
35use serde::{Serialize, Serializer};
36
37/// Used for defining new types of metrics that act as a counter (i.e they are continuously updated by
38/// incrementing their value).
39pub trait IncMetric {
40    /// Adds `value` to the current counter.
41    fn add(&self, value: usize);
42    /// Increments by 1 unit the current counter.
43    fn inc(&self) {
44        self.add(1);
45    }
46    /// Returns current value of the counter.
47    fn count(&self) -> usize;
48}
49
50/// Representation of a metric that is expected to be incremented from more than one thread, so more
51/// synchronization is necessary.
52// It's currently used for vCPU metrics. An alternative here would be
53// to have one instance of every metric for each thread, and to
54// aggregate them when writing. However this probably overkill unless we have a lot of vCPUs
55// incrementing metrics very often. Still, it's there if we ever need it :-s
56// We will be keeping two values for each metric for being able to reset
57// counters on each metric.
58// 1st member - current value being updated
59// 2nd member - old value that gets the current value whenever metrics is flushed to disk
60#[derive(Default)]
61pub struct SharedIncMetric(AtomicUsize, AtomicUsize);
62
63impl IncMetric for SharedIncMetric {
64    // While the order specified for this operation is still Relaxed, the actual instruction will
65    // be an asm "LOCK; something" and thus atomic across multiple threads, simply because of the
66    // fetch_and_add (as opposed to "store(load() + 1)") implementation for atomics.
67    // TODO: would a stronger ordering make a difference here?
68    fn add(&self, value: usize) {
69        self.0.fetch_add(value, Ordering::Relaxed);
70    }
71
72    fn count(&self) -> usize {
73        self.0.load(Ordering::Relaxed)
74    }
75}
76
77impl Serialize for SharedIncMetric {
78    /// Reset counters of each metrics. Here we suppose that Serialize's goal is to help with the
79    /// flushing of metrics.
80    /// !!! Any print of the metrics will also reset them. Use with caution !!!
81    fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
82        // There's no serializer.serialize_usize() for some reason :(
83        let snapshot = self.0.load(Ordering::Relaxed);
84        let res = serializer.serialize_u64(snapshot as u64 - self.1.load(Ordering::Relaxed) as u64);
85
86        if res.is_ok() {
87            self.1.store(snapshot, Ordering::Relaxed);
88        }
89        res
90    }
91}
92
93/// Used for defining new types of metrics that do not need a counter and act as a persistent indicator.
94pub trait StoreMetric {
95    /// Returns current value of the counter.
96    fn fetch(&self) -> usize;
97    /// Stores `value` to the current counter.
98    fn store(&self, value: usize);
99}
100
101/// Representation of a metric that is expected to hold a value that can be accessed
102/// from more than one thread, so more synchronization is necessary.
103#[derive(Default)]
104pub struct SharedStoreMetric(AtomicUsize);
105
106impl StoreMetric for SharedStoreMetric {
107    fn fetch(&self) -> usize {
108        self.0.load(Ordering::Relaxed)
109    }
110
111    fn store(&self, value: usize) {
112        self.0.store(value, Ordering::Relaxed);
113    }
114}
115
116impl IncMetric for SharedStoreMetric {
117    fn add(&self, value: usize) {
118        // This operation wraps around on overflow.
119        self.0.fetch_add(value, Ordering::Relaxed);
120    }
121
122    fn count(&self) -> usize {
123        self.0.load(Ordering::Relaxed)
124    }
125}
126
127impl Serialize for SharedStoreMetric {
128    fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
129        serializer.serialize_u64(self.0.load(Ordering::Relaxed) as u64)
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use std::sync::atomic::fence;
136    use std::sync::Arc;
137    use std::thread;
138
139    use super::*;
140
141    #[test]
142    fn test_shared_inc_metric() {
143        let metric = Arc::new(SharedIncMetric::default());
144
145        // We're going to create a number of threads that will attempt to increase this metric
146        // in parallel. If everything goes fine we still can't be sure the synchronization works,
147        // but if something fails, then we definitely have a problem :-s
148
149        const NUM_THREADS_TO_SPAWN: usize = 4;
150        const NUM_INCREMENTS_PER_THREAD: usize = 10_0000;
151        const M2_INITIAL_COUNT: usize = 123;
152
153        metric.add(M2_INITIAL_COUNT);
154
155        let mut v = Vec::with_capacity(NUM_THREADS_TO_SPAWN);
156
157        for _ in 0..NUM_THREADS_TO_SPAWN {
158            let r = metric.clone();
159            v.push(thread::spawn(move || {
160                for _ in 0..NUM_INCREMENTS_PER_THREAD {
161                    r.inc();
162                }
163            }));
164        }
165
166        for handle in v {
167            handle.join().unwrap();
168        }
169
170        assert_eq!(
171            metric.count(),
172            M2_INITIAL_COUNT + NUM_THREADS_TO_SPAWN * NUM_INCREMENTS_PER_THREAD
173        );
174    }
175
176    #[test]
177    fn test_shared_store_metric() {
178        let m1 = Arc::new(SharedStoreMetric::default());
179        m1.store(1);
180        fence(Ordering::SeqCst);
181        assert_eq!(1, m1.fetch());
182    }
183
184    #[test]
185    fn test_serialize() {
186        let s = serde_json::to_string(&SharedIncMetric(
187            AtomicUsize::new(123),
188            AtomicUsize::new(111),
189        ));
190        assert!(s.is_ok());
191    }
192
193    #[test]
194    fn test_wraps_around() {
195        let m = SharedStoreMetric(AtomicUsize::new(usize::MAX));
196        m.add(1);
197        assert_eq!(m.count(), 0);
198    }
199}