dbs_utils/metric.rs
1// Copyright (C) 2022 Alibaba Cloud. All rights reserved.
2// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3// SPDX-License-Identifier: Apache-2.0
4
5//! Defines the public components of the metric system.
6//!
7//! # Design
8//! The main design goals of this system are:
9//! * Use lockless operations, preferably ones that don't require anything other than
10//! simple reads/writes being atomic.
11//! * Exploit interior mutability and atomics being Sync to allow all methods (including the ones
12//! which are effectively mutable) to be callable on a global non-mut static.
13//! * Rely on `serde` to provide the actual serialization for writing the metrics.
14//! * Since all metrics start at 0, we implement the `Default` trait via derive for all of them,
15//! to avoid having to initialize everything by hand.
16//!
17//! The system implements 2 types of metrics:
18//! * Shared Incremental Metrics (SharedIncMetrics) - dedicated for the metrics which need a counter
19//! (i.e the number of times an API request failed). These metrics are reset upon flush.
20//! * Shared Store Metrics (SharedStoreMetrics) - are targeted at keeping a persistent value, it is not
21//! intended to act as a counter (i.e for measure the process start up time for example).
22//!
23//! The current approach for the `SharedIncMetrics` type is to store two values (current and previous)
24//! and compute the delta between them each time we do a flush (i.e by serialization). There are a number of advantages
25//! to this approach, including:
26//! * We don't have to introduce an additional write (to reset the value) from the thread which
27//! does to actual writing, so less synchronization effort is required.
28//! * We don't have to worry at all that much about losing some data if writing fails for a while
29//! (this could be a concern, I guess).
30//! If if turns out this approach is not really what we want, it's pretty easy to resort to
31//! something else, while working behind the same interface.
32
33use std::sync::atomic::{AtomicUsize, Ordering};
34
35use serde::{Serialize, Serializer};
36
37/// Used for defining new types of metrics that act as a counter (i.e they are continuously updated by
38/// incrementing their value).
39pub trait IncMetric {
40 /// Adds `value` to the current counter.
41 fn add(&self, value: usize);
42 /// Increments by 1 unit the current counter.
43 fn inc(&self) {
44 self.add(1);
45 }
46 /// Returns current value of the counter.
47 fn count(&self) -> usize;
48}
49
50/// Representation of a metric that is expected to be incremented from more than one thread, so more
51/// synchronization is necessary.
52// It's currently used for vCPU metrics. An alternative here would be
53// to have one instance of every metric for each thread, and to
54// aggregate them when writing. However this probably overkill unless we have a lot of vCPUs
55// incrementing metrics very often. Still, it's there if we ever need it :-s
56// We will be keeping two values for each metric for being able to reset
57// counters on each metric.
58// 1st member - current value being updated
59// 2nd member - old value that gets the current value whenever metrics is flushed to disk
60#[derive(Default)]
61pub struct SharedIncMetric(AtomicUsize, AtomicUsize);
62
63impl IncMetric for SharedIncMetric {
64 // While the order specified for this operation is still Relaxed, the actual instruction will
65 // be an asm "LOCK; something" and thus atomic across multiple threads, simply because of the
66 // fetch_and_add (as opposed to "store(load() + 1)") implementation for atomics.
67 // TODO: would a stronger ordering make a difference here?
68 fn add(&self, value: usize) {
69 self.0.fetch_add(value, Ordering::Relaxed);
70 }
71
72 fn count(&self) -> usize {
73 self.0.load(Ordering::Relaxed)
74 }
75}
76
77impl Serialize for SharedIncMetric {
78 /// Reset counters of each metrics. Here we suppose that Serialize's goal is to help with the
79 /// flushing of metrics.
80 /// !!! Any print of the metrics will also reset them. Use with caution !!!
81 fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
82 // There's no serializer.serialize_usize() for some reason :(
83 let snapshot = self.0.load(Ordering::Relaxed);
84 let res = serializer.serialize_u64(snapshot as u64 - self.1.load(Ordering::Relaxed) as u64);
85
86 if res.is_ok() {
87 self.1.store(snapshot, Ordering::Relaxed);
88 }
89 res
90 }
91}
92
93/// Used for defining new types of metrics that do not need a counter and act as a persistent indicator.
94pub trait StoreMetric {
95 /// Returns current value of the counter.
96 fn fetch(&self) -> usize;
97 /// Stores `value` to the current counter.
98 fn store(&self, value: usize);
99}
100
101/// Representation of a metric that is expected to hold a value that can be accessed
102/// from more than one thread, so more synchronization is necessary.
103#[derive(Default)]
104pub struct SharedStoreMetric(AtomicUsize);
105
106impl StoreMetric for SharedStoreMetric {
107 fn fetch(&self) -> usize {
108 self.0.load(Ordering::Relaxed)
109 }
110
111 fn store(&self, value: usize) {
112 self.0.store(value, Ordering::Relaxed);
113 }
114}
115
116impl IncMetric for SharedStoreMetric {
117 fn add(&self, value: usize) {
118 // This operation wraps around on overflow.
119 self.0.fetch_add(value, Ordering::Relaxed);
120 }
121
122 fn count(&self) -> usize {
123 self.0.load(Ordering::Relaxed)
124 }
125}
126
127impl Serialize for SharedStoreMetric {
128 fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
129 serializer.serialize_u64(self.0.load(Ordering::Relaxed) as u64)
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use std::sync::atomic::fence;
136 use std::sync::Arc;
137 use std::thread;
138
139 use super::*;
140
141 #[test]
142 fn test_shared_inc_metric() {
143 let metric = Arc::new(SharedIncMetric::default());
144
145 // We're going to create a number of threads that will attempt to increase this metric
146 // in parallel. If everything goes fine we still can't be sure the synchronization works,
147 // but if something fails, then we definitely have a problem :-s
148
149 const NUM_THREADS_TO_SPAWN: usize = 4;
150 const NUM_INCREMENTS_PER_THREAD: usize = 10_0000;
151 const M2_INITIAL_COUNT: usize = 123;
152
153 metric.add(M2_INITIAL_COUNT);
154
155 let mut v = Vec::with_capacity(NUM_THREADS_TO_SPAWN);
156
157 for _ in 0..NUM_THREADS_TO_SPAWN {
158 let r = metric.clone();
159 v.push(thread::spawn(move || {
160 for _ in 0..NUM_INCREMENTS_PER_THREAD {
161 r.inc();
162 }
163 }));
164 }
165
166 for handle in v {
167 handle.join().unwrap();
168 }
169
170 assert_eq!(
171 metric.count(),
172 M2_INITIAL_COUNT + NUM_THREADS_TO_SPAWN * NUM_INCREMENTS_PER_THREAD
173 );
174 }
175
176 #[test]
177 fn test_shared_store_metric() {
178 let m1 = Arc::new(SharedStoreMetric::default());
179 m1.store(1);
180 fence(Ordering::SeqCst);
181 assert_eq!(1, m1.fetch());
182 }
183
184 #[test]
185 fn test_serialize() {
186 let s = serde_json::to_string(&SharedIncMetric(
187 AtomicUsize::new(123),
188 AtomicUsize::new(111),
189 ));
190 assert!(s.is_ok());
191 }
192
193 #[test]
194 fn test_wraps_around() {
195 let m = SharedStoreMetric(AtomicUsize::new(usize::MAX));
196 m.add(1);
197 assert_eq!(m.count(), 0);
198 }
199}