1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3
4#[derive(Debug, Clone, Default)]
5pub struct ReplicationMetrics {
6 inner: Arc<ReplicationMetricsInner>,
7}
8
9#[derive(Debug, Default)]
10struct ReplicationMetricsInner {
11 emitted_mutations: AtomicU64,
12 sent_batches: AtomicU64,
13 sent_records: AtomicU64,
14 sent_bytes_compressed: AtomicU64,
15 sent_bytes_uncompressed: AtomicU64,
16 compression_ns: AtomicU64,
17 queue_depth: AtomicUsize,
18 queue_high_watermark: AtomicUsize,
19 drops: AtomicU64,
20 backpressure_events: AtomicU64,
21 replica_applied: AtomicU64,
22 replica_skipped: AtomicU64,
23 replica_apply_ns: AtomicU64,
24 catch_up_backlog_count: AtomicU64,
25 catch_up_snapshot_count: AtomicU64,
26}
27
28#[derive(Debug, Clone, Default, PartialEq)]
29pub struct ReplicationMetricsSnapshot {
30 pub emitted_mutations: u64,
31 pub sent_batches: u64,
32 pub sent_records: u64,
33 pub sent_bytes_compressed: u64,
34 pub sent_bytes_uncompressed: u64,
35 pub compression_ns: u64,
36 pub queue_depth: usize,
37 pub queue_high_watermark: usize,
38 pub drops: u64,
39 pub backpressure_events: u64,
40 pub replica_applied: u64,
41 pub replica_skipped: u64,
42 pub replica_apply_ns: u64,
43 pub catch_up_backlog_count: u64,
44 pub catch_up_snapshot_count: u64,
45}
46
47impl ReplicationMetrics {
48 pub fn record_emit(&self, queue_depth: usize) {
49 self.record_emit_count(1, queue_depth);
50 }
51
52 pub fn record_emit_count(&self, count: usize, queue_depth: usize) {
53 self.inner
54 .emitted_mutations
55 .fetch_add(count as u64, Ordering::Relaxed);
56 self.set_queue_depth(queue_depth);
57 }
58
59 pub fn record_drop(&self) {
60 self.inner.drops.fetch_add(1, Ordering::Relaxed);
61 }
62
63 pub fn record_backpressure(&self) {
64 self.inner
65 .backpressure_events
66 .fetch_add(1, Ordering::Relaxed);
67 }
68
69 pub fn record_batch(
70 &self,
71 records: usize,
72 uncompressed_bytes: usize,
73 compressed_bytes: usize,
74 compression_ns: u64,
75 ) {
76 self.inner.sent_batches.fetch_add(1, Ordering::Relaxed);
77 self.inner
78 .sent_records
79 .fetch_add(records as u64, Ordering::Relaxed);
80 self.inner
81 .sent_bytes_uncompressed
82 .fetch_add(uncompressed_bytes as u64, Ordering::Relaxed);
83 self.inner
84 .sent_bytes_compressed
85 .fetch_add(compressed_bytes as u64, Ordering::Relaxed);
86 self.inner
87 .compression_ns
88 .fetch_add(compression_ns, Ordering::Relaxed);
89 }
90
91 pub fn record_queue_depth(&self, queue_depth: usize) {
92 self.set_queue_depth(queue_depth);
93 }
94
95 pub fn record_replica_apply(&self, applied: bool, apply_ns: u64) {
96 if applied {
97 self.inner.replica_applied.fetch_add(1, Ordering::Relaxed);
98 } else {
99 self.inner.replica_skipped.fetch_add(1, Ordering::Relaxed);
100 }
101 self.inner
102 .replica_apply_ns
103 .fetch_add(apply_ns, Ordering::Relaxed);
104 }
105
106 pub fn record_replica_apply_batch(&self, applied: u64, skipped: u64, apply_ns: u64) {
107 self.inner
108 .replica_applied
109 .fetch_add(applied, Ordering::Relaxed);
110 self.inner
111 .replica_skipped
112 .fetch_add(skipped, Ordering::Relaxed);
113 self.inner
114 .replica_apply_ns
115 .fetch_add(apply_ns, Ordering::Relaxed);
116 }
117
118 pub fn record_backlog_catch_up(&self) {
119 self.inner
120 .catch_up_backlog_count
121 .fetch_add(1, Ordering::Relaxed);
122 }
123
124 pub fn record_snapshot_catch_up(&self) {
125 self.inner
126 .catch_up_snapshot_count
127 .fetch_add(1, Ordering::Relaxed);
128 }
129
130 pub fn snapshot(&self) -> ReplicationMetricsSnapshot {
131 ReplicationMetricsSnapshot {
132 emitted_mutations: self.inner.emitted_mutations.load(Ordering::Relaxed),
133 sent_batches: self.inner.sent_batches.load(Ordering::Relaxed),
134 sent_records: self.inner.sent_records.load(Ordering::Relaxed),
135 sent_bytes_compressed: self.inner.sent_bytes_compressed.load(Ordering::Relaxed),
136 sent_bytes_uncompressed: self.inner.sent_bytes_uncompressed.load(Ordering::Relaxed),
137 compression_ns: self.inner.compression_ns.load(Ordering::Relaxed),
138 queue_depth: self.inner.queue_depth.load(Ordering::Relaxed),
139 queue_high_watermark: self.inner.queue_high_watermark.load(Ordering::Relaxed),
140 drops: self.inner.drops.load(Ordering::Relaxed),
141 backpressure_events: self.inner.backpressure_events.load(Ordering::Relaxed),
142 replica_applied: self.inner.replica_applied.load(Ordering::Relaxed),
143 replica_skipped: self.inner.replica_skipped.load(Ordering::Relaxed),
144 replica_apply_ns: self.inner.replica_apply_ns.load(Ordering::Relaxed),
145 catch_up_backlog_count: self.inner.catch_up_backlog_count.load(Ordering::Relaxed),
146 catch_up_snapshot_count: self.inner.catch_up_snapshot_count.load(Ordering::Relaxed),
147 }
148 }
149
150 fn set_queue_depth(&self, queue_depth: usize) {
151 self.inner.queue_depth.store(queue_depth, Ordering::Relaxed);
152 let mut current = self.inner.queue_high_watermark.load(Ordering::Relaxed);
153 while queue_depth > current {
154 match self.inner.queue_high_watermark.compare_exchange_weak(
155 current,
156 queue_depth,
157 Ordering::Relaxed,
158 Ordering::Relaxed,
159 ) {
160 Ok(_) => break,
161 Err(next) => current = next,
162 }
163 }
164 }
165}