1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
    cell::{RefCell, RefMut},
    ops::AddAssign,
    time::{Duration, Instant},
};

use prometheus::*;
use prometheus_static_metric::*;

use crate::util::InstantExt;

pub struct StopWatch<M: TimeMetric> {
    metric: M,
    start: Instant,
}

impl<M: TimeMetric> StopWatch<M> {
    #[inline]
    pub fn new(metric: M) -> Self {
        Self {
            metric,
            start: Instant::now(),
        }
    }

    #[inline]
    pub fn new_with(metric: M, start: Instant) -> Self {
        Self { metric, start }
    }
}

impl<M: TimeMetric> Drop for StopWatch<M> {
    fn drop(&mut self) {
        self.metric.observe(self.start.saturating_elapsed());
    }
}

/// PerfContext records cumulative performance statistics of operations.
///
/// Raft Engine will update the data in the thread-local PerfContext whenever
/// an opeartion is performed.
#[derive(Debug, Clone, Default)]
pub struct PerfContext {
    /// Time spent encoding and compressing log entries.
    pub log_populating_duration: Duration,

    /// Time spent waiting for the write group to form and get processed.
    pub write_wait_duration: Duration,

    /// Time spent writing the logs to files.
    pub log_write_duration: Duration,

    /// Time spent rotating the active log file.
    pub log_rotate_duration: Duration,

    // Time spent synchronizing logs to the disk.
    pub log_sync_duration: Duration,

    // Time spent applying the appended logs.
    pub apply_duration: Duration,
}

impl AddAssign<&'_ PerfContext> for PerfContext {
    fn add_assign(&mut self, rhs: &PerfContext) {
        self.log_populating_duration += rhs.log_populating_duration;
        self.write_wait_duration += rhs.write_wait_duration;
        self.log_write_duration += rhs.log_write_duration;
        self.log_rotate_duration += rhs.log_rotate_duration;
        self.log_sync_duration += rhs.log_sync_duration;
        self.apply_duration += rhs.apply_duration;
    }
}

thread_local! {
    static TLS_PERF_CONTEXT: RefCell<PerfContext> = RefCell::new(PerfContext::default());
}

/// Gets a copy of the thread-local PerfContext.
pub fn get_perf_context() -> PerfContext {
    TLS_PERF_CONTEXT.with(|c| c.borrow().clone())
}

/// Resets the thread-local PerfContext and takes its old value.
pub fn take_perf_context() -> PerfContext {
    TLS_PERF_CONTEXT.with(|c| c.take())
}

/// Sets the value of the thread-local PerfContext.
pub fn set_perf_context(perf_context: PerfContext) {
    TLS_PERF_CONTEXT.with(|c| *c.borrow_mut() = perf_context);
}

pub(crate) struct PerfContextField<P> {
    projector: P,
}

impl<P> PerfContextField<P>
where
    P: Fn(&mut PerfContext) -> &mut Duration,
{
    pub fn new(projector: P) -> Self {
        PerfContextField { projector }
    }
}

#[macro_export]
macro_rules! perf_context {
    ($field: ident) => {
        $crate::metrics::PerfContextField::new(|perf_context| &mut perf_context.$field)
    };
}

pub trait TimeMetric {
    fn observe(&self, duration: Duration);

    fn observe_since(&self, earlier: Instant) -> Duration {
        let dur = earlier.saturating_elapsed();
        self.observe(dur);
        dur
    }
}

impl<'a> TimeMetric for &'a Histogram {
    fn observe(&self, duration: Duration) {
        Histogram::observe(self, duration.as_secs_f64());
    }
}

impl<P> TimeMetric for PerfContextField<P>
where
    P: Fn(&mut PerfContext) -> &mut Duration,
{
    fn observe(&self, duration: Duration) {
        TLS_PERF_CONTEXT.with(|perf_context| {
            *RefMut::map(perf_context.borrow_mut(), &self.projector) += duration;
        })
    }
}

impl<M1, M2> TimeMetric for (M1, M2)
where
    M1: TimeMetric,
    M2: TimeMetric,
{
    fn observe(&self, duration: Duration) {
        self.0.observe(duration);
        self.1.observe(duration);
    }
}

make_static_metric! {
    pub label_enum LogQueueKind {
        rewrite,
        append,
    }

    pub struct LogQueueHistogramVec: Histogram {
        "type" => LogQueueKind,
    }

    pub struct LogQueueCounterVec: IntCounter {
        "type" => LogQueueKind,
    }

    pub struct LogQueueGaugeVec: IntGauge {
        "type" => LogQueueKind,
    }
}

lazy_static! {
    // Write path.
    pub static ref ENGINE_WRITE_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_write_duration_seconds",
        "Bucketed histogram of Raft Engine write duration",
        exponential_buckets(0.00005, 1.8, 26).unwrap()
    )
    .unwrap();
    pub static ref ENGINE_WRITE_PREPROCESS_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_write_preprocess_duration_seconds",
        "Bucketed histogram of Raft Engine write preprocess duration",
        exponential_buckets(0.00005, 1.8, 26).unwrap()
    )
    .unwrap();
    pub static ref ENGINE_WRITE_LEADER_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_write_leader_duration_seconds",
        "Bucketed histogram of Raft Engine write leader duration",
        exponential_buckets(0.00005, 1.8, 26).unwrap()
    )
    .unwrap();
    pub static ref ENGINE_WRITE_APPLY_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_write_apply_duration_seconds",
        "Bucketed histogram of Raft Engine write apply duration",
        exponential_buckets(0.00005, 1.8, 26).unwrap()
    )
    .unwrap();
    pub static ref ENGINE_WRITE_SIZE_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_write_size",
        "Bucketed histogram of Raft Engine write size",
        exponential_buckets(256.0, 1.8, 22).unwrap()
    )
    .unwrap();
    pub static ref LOG_ALLOCATE_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_allocate_log_duration_seconds",
        "Bucketed histogram of Raft Engine allocate log duration",
        exponential_buckets(0.00005, 1.8, 26).unwrap()
    )
    .unwrap();
    pub static ref LOG_SYNC_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_sync_log_duration_seconds",
        "Bucketed histogram of Raft Engine sync log duration",
        exponential_buckets(0.00005, 1.8, 26).unwrap()
    )
    .unwrap();
    pub static ref LOG_ROTATE_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_rotate_log_duration_seconds",
        "Bucketed histogram of Raft Engine rotate log duration",
        exponential_buckets(0.00005, 1.8, 26).unwrap()
    )
    .unwrap();
    // Read path.
    pub static ref ENGINE_READ_ENTRY_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_read_entry_duration_seconds",
        "Bucketed histogram of Raft Engine read entry duration",
        exponential_buckets(0.00005, 1.8, 26).unwrap()
    )
    .unwrap();
    pub static ref ENGINE_READ_ENTRY_COUNT_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_read_entry_count",
        "Bucketed histogram of Raft Engine read entry count",
        exponential_buckets(1.0, 1.8, 22).unwrap()
    )
    .unwrap();
    pub static ref ENGINE_READ_MESSAGE_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_read_message_duration_seconds",
        "Bucketed histogram of Raft Engine read message duration",
        exponential_buckets(0.00005, 1.8, 26).unwrap()
    )
    .unwrap();
    // Misc.
    pub static ref ENGINE_PURGE_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_purge_duration_seconds",
        "Bucketed histogram of Raft Engine purge expired files duration",
        exponential_buckets(0.001, 1.8, 22).unwrap()
    )
    .unwrap();
    pub static ref ENGINE_REWRITE_APPEND_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_rewrite_append_duration_seconds",
        "Bucketed histogram of Raft Engine rewrite append queue duration",
        exponential_buckets(0.001, 1.8, 22).unwrap()
    )
    .unwrap();
    pub static ref ENGINE_REWRITE_REWRITE_DURATION_HISTOGRAM: Histogram = register_histogram!(
        "raft_engine_rewrite_rewrite_duration_seconds",
        "Bucketed histogram of Raft Engine rewrite rewrite queue duration",
        exponential_buckets(0.001, 1.8, 22).unwrap()
    )
    .unwrap();
    pub static ref BACKGROUND_REWRITE_BYTES: LogQueueHistogramVec = register_static_histogram_vec!(
        LogQueueHistogramVec,
        "raft_engine_background_rewrite_bytes",
        "Bucketed histogram of bytes written during background rewrite",
        &["type"],
        exponential_buckets(256.0, 1.8, 22).unwrap()
    )
    .unwrap();
    pub static ref LOG_FILE_COUNT: LogQueueGaugeVec = register_static_int_gauge_vec!(
        LogQueueGaugeVec,
        "raft_engine_log_file_count",
        "Amount of log files in Raft engine",
        &["type"]
    )
    .unwrap();
    pub static ref RECYCLED_FILE_COUNT: LogQueueGaugeVec = register_static_int_gauge_vec!(
        LogQueueGaugeVec,
        "raft_engine_recycled_file_count",
        "Amount of recycled files in Raft engine",
        &["type"]
    )
    .unwrap();
    pub static ref SWAP_FILE_COUNT: IntGauge = register_int_gauge!(
        "raft_engine_swap_file_count",
        "Amount of swap files in Raft engine"
    )
    .unwrap();
    pub static ref LOG_ENTRY_COUNT: LogQueueGaugeVec = register_static_int_gauge_vec!(
        LogQueueGaugeVec,
        "raft_engine_log_entry_count",
        "Number of log entries in Raft engine",
        &["type"]
    )
    .unwrap();
    pub static ref MEMORY_USAGE: IntGauge = register_int_gauge!(
        "raft_engine_memory_usage",
        "Memory in bytes used by Raft engine",
    )
    .unwrap();
}