Skip to main content

fast_cache/replication/
metrics.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3
4#[derive(Debug, Clone, Default)]
5pub struct ReplicationMetrics {
6    inner: Arc<ReplicationMetricsInner>,
7}
8
9#[derive(Debug, Default)]
10struct ReplicationMetricsInner {
11    emitted_mutations: AtomicU64,
12    sent_batches: AtomicU64,
13    sent_records: AtomicU64,
14    sent_bytes_compressed: AtomicU64,
15    sent_bytes_uncompressed: AtomicU64,
16    compression_ns: AtomicU64,
17    queue_depth: AtomicUsize,
18    queue_high_watermark: AtomicUsize,
19    drops: AtomicU64,
20    backpressure_events: AtomicU64,
21    replica_applied: AtomicU64,
22    replica_skipped: AtomicU64,
23    replica_apply_ns: AtomicU64,
24    catch_up_backlog_count: AtomicU64,
25    catch_up_snapshot_count: AtomicU64,
26}
27
28#[derive(Debug, Clone, Default, PartialEq)]
29pub struct ReplicationMetricsSnapshot {
30    pub emitted_mutations: u64,
31    pub sent_batches: u64,
32    pub sent_records: u64,
33    pub sent_bytes_compressed: u64,
34    pub sent_bytes_uncompressed: u64,
35    pub compression_ns: u64,
36    pub queue_depth: usize,
37    pub queue_high_watermark: usize,
38    pub drops: u64,
39    pub backpressure_events: u64,
40    pub replica_applied: u64,
41    pub replica_skipped: u64,
42    pub replica_apply_ns: u64,
43    pub catch_up_backlog_count: u64,
44    pub catch_up_snapshot_count: u64,
45}
46
47impl ReplicationMetrics {
48    pub fn record_emit(&self, queue_depth: usize) {
49        self.record_emit_count(1, queue_depth);
50    }
51
52    pub fn record_emit_count(&self, count: usize, queue_depth: usize) {
53        self.inner
54            .emitted_mutations
55            .fetch_add(count as u64, Ordering::Relaxed);
56        self.set_queue_depth(queue_depth);
57    }
58
59    pub fn record_drop(&self) {
60        self.inner.drops.fetch_add(1, Ordering::Relaxed);
61    }
62
63    pub fn record_backpressure(&self) {
64        self.inner
65            .backpressure_events
66            .fetch_add(1, Ordering::Relaxed);
67    }
68
69    pub fn record_batch(
70        &self,
71        records: usize,
72        uncompressed_bytes: usize,
73        compressed_bytes: usize,
74        compression_ns: u64,
75    ) {
76        self.inner.sent_batches.fetch_add(1, Ordering::Relaxed);
77        self.inner
78            .sent_records
79            .fetch_add(records as u64, Ordering::Relaxed);
80        self.inner
81            .sent_bytes_uncompressed
82            .fetch_add(uncompressed_bytes as u64, Ordering::Relaxed);
83        self.inner
84            .sent_bytes_compressed
85            .fetch_add(compressed_bytes as u64, Ordering::Relaxed);
86        self.inner
87            .compression_ns
88            .fetch_add(compression_ns, Ordering::Relaxed);
89    }
90
91    pub fn record_queue_depth(&self, queue_depth: usize) {
92        self.set_queue_depth(queue_depth);
93    }
94
95    pub fn record_replica_apply(&self, applied: bool, apply_ns: u64) {
96        if applied {
97            self.inner.replica_applied.fetch_add(1, Ordering::Relaxed);
98        } else {
99            self.inner.replica_skipped.fetch_add(1, Ordering::Relaxed);
100        }
101        self.inner
102            .replica_apply_ns
103            .fetch_add(apply_ns, Ordering::Relaxed);
104    }
105
106    pub fn record_replica_apply_batch(&self, applied: u64, skipped: u64, apply_ns: u64) {
107        self.inner
108            .replica_applied
109            .fetch_add(applied, Ordering::Relaxed);
110        self.inner
111            .replica_skipped
112            .fetch_add(skipped, Ordering::Relaxed);
113        self.inner
114            .replica_apply_ns
115            .fetch_add(apply_ns, Ordering::Relaxed);
116    }
117
118    pub fn record_backlog_catch_up(&self) {
119        self.inner
120            .catch_up_backlog_count
121            .fetch_add(1, Ordering::Relaxed);
122    }
123
124    pub fn record_snapshot_catch_up(&self) {
125        self.inner
126            .catch_up_snapshot_count
127            .fetch_add(1, Ordering::Relaxed);
128    }
129
130    pub fn snapshot(&self) -> ReplicationMetricsSnapshot {
131        ReplicationMetricsSnapshot {
132            emitted_mutations: self.inner.emitted_mutations.load(Ordering::Relaxed),
133            sent_batches: self.inner.sent_batches.load(Ordering::Relaxed),
134            sent_records: self.inner.sent_records.load(Ordering::Relaxed),
135            sent_bytes_compressed: self.inner.sent_bytes_compressed.load(Ordering::Relaxed),
136            sent_bytes_uncompressed: self.inner.sent_bytes_uncompressed.load(Ordering::Relaxed),
137            compression_ns: self.inner.compression_ns.load(Ordering::Relaxed),
138            queue_depth: self.inner.queue_depth.load(Ordering::Relaxed),
139            queue_high_watermark: self.inner.queue_high_watermark.load(Ordering::Relaxed),
140            drops: self.inner.drops.load(Ordering::Relaxed),
141            backpressure_events: self.inner.backpressure_events.load(Ordering::Relaxed),
142            replica_applied: self.inner.replica_applied.load(Ordering::Relaxed),
143            replica_skipped: self.inner.replica_skipped.load(Ordering::Relaxed),
144            replica_apply_ns: self.inner.replica_apply_ns.load(Ordering::Relaxed),
145            catch_up_backlog_count: self.inner.catch_up_backlog_count.load(Ordering::Relaxed),
146            catch_up_snapshot_count: self.inner.catch_up_snapshot_count.load(Ordering::Relaxed),
147        }
148    }
149
150    fn set_queue_depth(&self, queue_depth: usize) {
151        self.inner.queue_depth.store(queue_depth, Ordering::Relaxed);
152        let mut current = self.inner.queue_high_watermark.load(Ordering::Relaxed);
153        while queue_depth > current {
154            match self.inner.queue_high_watermark.compare_exchange_weak(
155                current,
156                queue_depth,
157                Ordering::Relaxed,
158                Ordering::Relaxed,
159            ) {
160                Ok(_) => break,
161                Err(next) => current = next,
162            }
163        }
164    }
165}