1use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5
6#[derive(Debug, Clone)]
8pub struct LoggingMetrics {
9 pub entries_processed: Arc<AtomicU64>,
11
12 pub entries_published: Arc<AtomicU64>,
14
15 pub entries_failed: Arc<AtomicU64>,
17
18 pub entries_rate_limited: Arc<AtomicU64>,
20
21 pub entries_buffer_overflow: Arc<AtomicU64>,
23
24 pub bytes_published: Arc<AtomicU64>,
26
27 pub publish_attempts: Arc<AtomicU64>,
29
30 pub publish_retries: Arc<AtomicU64>,
32
33 pub buffer_size: Arc<AtomicU64>,
35}
36
37impl Default for LoggingMetrics {
38 fn default() -> Self {
39 Self {
40 entries_processed: Arc::new(AtomicU64::new(0)),
41 entries_published: Arc::new(AtomicU64::new(0)),
42 entries_failed: Arc::new(AtomicU64::new(0)),
43 entries_rate_limited: Arc::new(AtomicU64::new(0)),
44 entries_buffer_overflow: Arc::new(AtomicU64::new(0)),
45 bytes_published: Arc::new(AtomicU64::new(0)),
46 publish_attempts: Arc::new(AtomicU64::new(0)),
47 publish_retries: Arc::new(AtomicU64::new(0)),
48 buffer_size: Arc::new(AtomicU64::new(0)),
49 }
50 }
51}
52
53impl LoggingMetrics {
54 pub fn new() -> Self {
56 Self::default()
57 }
58
59 pub fn inc_entries_processed(&self) {
61 self.entries_processed.fetch_add(1, Ordering::Relaxed);
62 }
63
64 pub fn inc_entries_published(&self) {
66 self.entries_published.fetch_add(1, Ordering::Relaxed);
67 }
68
69 pub fn inc_entries_failed(&self) {
71 self.entries_failed.fetch_add(1, Ordering::Relaxed);
72 }
73
74 pub fn inc_entries_rate_limited(&self) {
76 self.entries_rate_limited.fetch_add(1, Ordering::Relaxed);
77 }
78
79 pub fn inc_entries_buffer_overflow(&self) {
81 self.entries_buffer_overflow.fetch_add(1, Ordering::Relaxed);
82 }
83
84 pub fn add_bytes_published(&self, bytes: u64) {
86 self.bytes_published.fetch_add(bytes, Ordering::Relaxed);
87 }
88
89 pub fn inc_publish_attempts(&self) {
91 self.publish_attempts.fetch_add(1, Ordering::Relaxed);
92 }
93
94 pub fn inc_publish_retries(&self) {
96 self.publish_retries.fetch_add(1, Ordering::Relaxed);
97 }
98
99 pub fn set_buffer_size(&self, size: u64) {
101 self.buffer_size.store(size, Ordering::Relaxed);
102 }
103
104 pub fn get_entries_processed(&self) -> u64 {
106 self.entries_processed.load(Ordering::Relaxed)
107 }
108
109 pub fn get_entries_published(&self) -> u64 {
111 self.entries_published.load(Ordering::Relaxed)
112 }
113
114 pub fn get_entries_failed(&self) -> u64 {
116 self.entries_failed.load(Ordering::Relaxed)
117 }
118
119 pub fn get_entries_rate_limited(&self) -> u64 {
121 self.entries_rate_limited.load(Ordering::Relaxed)
122 }
123
124 pub fn get_entries_buffer_overflow(&self) -> u64 {
126 self.entries_buffer_overflow.load(Ordering::Relaxed)
127 }
128
129 pub fn get_bytes_published(&self) -> u64 {
131 self.bytes_published.load(Ordering::Relaxed)
132 }
133
134 pub fn get_publish_attempts(&self) -> u64 {
136 self.publish_attempts.load(Ordering::Relaxed)
137 }
138
139 pub fn get_publish_retries(&self) -> u64 {
141 self.publish_retries.load(Ordering::Relaxed)
142 }
143
144 pub fn get_buffer_size(&self) -> u64 {
146 self.buffer_size.load(Ordering::Relaxed)
147 }
148
149 pub fn get_success_rate(&self) -> f64 {
151 let published = self.get_entries_published();
152 let total = self.get_entries_processed();
153
154 if total == 0 {
155 0.0
156 } else {
157 (published as f64 / total as f64) * 100.0
158 }
159 }
160
161 pub fn get_avg_bytes_per_entry(&self) -> f64 {
163 let bytes = self.get_bytes_published();
164 let entries = self.get_entries_published();
165
166 if entries == 0 {
167 0.0
168 } else {
169 bytes as f64 / entries as f64
170 }
171 }
172
173 pub fn get_summary(&self) -> LoggingMetricsSummary {
175 LoggingMetricsSummary {
176 entries_processed: self.get_entries_processed(),
177 entries_published: self.get_entries_published(),
178 entries_failed: self.get_entries_failed(),
179 entries_rate_limited: self.get_entries_rate_limited(),
180 entries_buffer_overflow: self.get_entries_buffer_overflow(),
181 bytes_published: self.get_bytes_published(),
182 publish_attempts: self.get_publish_attempts(),
183 publish_retries: self.get_publish_retries(),
184 current_buffer_size: self.get_buffer_size(),
185 success_rate: self.get_success_rate(),
186 avg_bytes_per_entry: self.get_avg_bytes_per_entry(),
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
193pub struct LoggingMetricsSummary {
194 pub entries_processed: u64,
195 pub entries_published: u64,
196 pub entries_failed: u64,
197 pub entries_rate_limited: u64,
198 pub entries_buffer_overflow: u64,
199 pub bytes_published: u64,
200 pub publish_attempts: u64,
201 pub publish_retries: u64,
202 pub current_buffer_size: u64,
203 pub success_rate: f64,
204 pub avg_bytes_per_entry: f64,
205}
206
207impl std::fmt::Display for LoggingMetricsSummary {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 write!(f, "LoggingMetrics {{ processed: {}, published: {}, failed: {}, rate_limited: {}, buffer_overflow: {}, success_rate: {:.2}%, avg_bytes: {:.1} }}",
210 self.entries_processed,
211 self.entries_published,
212 self.entries_failed,
213 self.entries_rate_limited,
214 self.entries_buffer_overflow,
215 self.success_rate,
216 self.avg_bytes_per_entry)
217 }
218}