1use crate::prelude::*;
18use crate::{Message, Receiver};
19use serde::de::DeserializeOwned;
20use serde::{Deserialize, Serialize};
21use std::collections::VecDeque;
22use std::sync::Arc;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24use tokio::sync::RwLock;
25
26fn now_micros() -> u64 {
28 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as u64
29}
30
31#[derive(Clone)]
33pub struct PerformanceMetrics {
34 inner: Arc<RwLock<MetricsInner>>,
35}
36
37struct MetricsInner {
38 latency_tracker: LatencyTracker,
39 throughput_tracker: ThroughputTracker,
40 execution_tracker: ExecutionTracker,
41 memory_tracker: MemoryTracker,
42}
43
44impl PerformanceMetrics {
45 pub fn new() -> Self {
47 Self {
48 inner: Arc::new(RwLock::new(MetricsInner {
49 latency_tracker: LatencyTracker::new(),
50 throughput_tracker: ThroughputTracker::new(),
51 execution_tracker: ExecutionTracker::new(),
52 memory_tracker: MemoryTracker::new(),
53 })),
54 }
55 }
56
57 pub async fn record_latency(&self, topic: &str, latency_us: u64) {
59 let mut inner = self.inner.write().await;
60 inner.latency_tracker.record(topic, latency_us);
61 }
62
63 pub async fn record_message(&self, topic: &str, size_bytes: usize) {
65 let mut inner = self.inner.write().await;
66 inner.throughput_tracker.record(topic, size_bytes);
67 }
68
69 pub async fn start_timer(&self, name: &str) -> TimerGuard {
71 TimerGuard {
72 name: name.to_string(),
73 start: Instant::now(),
74 metrics: self.clone(),
75 }
76 }
77
78 pub async fn record_execution(&self, name: &str, duration: Duration) {
80 let mut inner = self.inner.write().await;
81 inner.execution_tracker.record(name, duration.as_micros() as u64);
82 }
83
84 pub async fn record_memory(&self, allocated_bytes: usize, freed_bytes: usize) {
86 let mut inner = self.inner.write().await;
87 inner.memory_tracker.record(allocated_bytes, freed_bytes);
88 }
89
90 pub async fn get_latency_stats(&self, topic: &str) -> Option<LatencyStats> {
92 let inner = self.inner.read().await;
93 inner.latency_tracker.get_stats(topic)
94 }
95
96 pub async fn get_throughput_stats(&self, topic: &str) -> Option<ThroughputStats> {
98 let inner = self.inner.read().await;
99 inner.throughput_tracker.get_stats(topic)
100 }
101
102 pub async fn get_execution_stats(&self, name: &str) -> Option<ExecutionStats> {
104 let inner = self.inner.read().await;
105 inner.execution_tracker.get_stats(name)
106 }
107
108 pub async fn get_memory_stats(&self) -> MemoryStats {
110 let inner = self.inner.read().await;
111 inner.memory_tracker.get_stats()
112 }
113
114 pub async fn get_report(&self) -> PerformanceReport {
116 let inner = self.inner.read().await;
117
118 PerformanceReport {
119 latency: inner.latency_tracker.get_all_stats(),
120 throughput: inner.throughput_tracker.get_all_stats(),
121 execution: inner.execution_tracker.get_all_stats(),
122 memory: inner.memory_tracker.get_stats(),
123 timestamp: now_micros(),
124 }
125 }
126
127 pub async fn reset(&self) {
129 let mut inner = self.inner.write().await;
130 inner.latency_tracker = LatencyTracker::new();
131 inner.throughput_tracker = ThroughputTracker::new();
132 inner.execution_tracker = ExecutionTracker::new();
133 inner.memory_tracker = MemoryTracker::new();
134 }
135}
136
137impl Default for PerformanceMetrics {
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143pub struct TimerGuard {
145 name: String,
146 start: Instant,
147 metrics: PerformanceMetrics,
148}
149
150impl Drop for TimerGuard {
151 fn drop(&mut self) {
152 let duration = self.start.elapsed();
153 let metrics = self.metrics.clone();
154 let name = self.name.clone();
155
156 tokio::spawn(async move {
157 metrics.record_execution(&name, duration).await;
158 });
159 }
160}
161
162struct LatencyTracker {
164 samples: std::collections::HashMap<String, SampleBuffer>,
165}
166
167impl LatencyTracker {
168 fn new() -> Self {
169 Self {
170 samples: std::collections::HashMap::new(),
171 }
172 }
173
174 fn record(&mut self, topic: &str, latency_us: u64) {
175 self.samples
176 .entry(topic.to_string())
177 .or_insert_with(SampleBuffer::new)
178 .add(latency_us);
179 }
180
181 fn get_stats(&self, topic: &str) -> Option<LatencyStats> {
182 self.samples.get(topic).map(|buffer| LatencyStats {
183 topic: topic.to_string(),
184 count: buffer.count(),
185 min_us: buffer.min(),
186 max_us: buffer.max(),
187 mean_us: buffer.mean(),
188 p50_us: buffer.percentile(0.50),
189 p95_us: buffer.percentile(0.95),
190 p99_us: buffer.percentile(0.99),
191 })
192 }
193
194 fn get_all_stats(&self) -> Vec<LatencyStats> {
195 self.samples.keys().filter_map(|topic| self.get_stats(topic)).collect()
196 }
197}
198
199struct ThroughputTracker {
201 data: std::collections::HashMap<String, ThroughputData>,
202}
203
204#[derive(Debug, Clone)]
205struct ThroughputData {
206 message_count: u64,
207 byte_count: u64,
208 start_time: Instant,
209}
210
211impl ThroughputTracker {
212 fn new() -> Self {
213 Self {
214 data: std::collections::HashMap::new(),
215 }
216 }
217
218 fn record(&mut self, topic: &str, size_bytes: usize) {
219 let entry = self.data.entry(topic.to_string()).or_insert(ThroughputData {
220 message_count: 0,
221 byte_count: 0,
222 start_time: Instant::now(),
223 });
224
225 entry.message_count += 1;
226 entry.byte_count += size_bytes as u64;
227 }
228
229 fn get_stats(&self, topic: &str) -> Option<ThroughputStats> {
230 self.data.get(topic).map(|data| {
231 let elapsed = data.start_time.elapsed().as_secs_f64();
232 let messages_per_sec = if elapsed > 0.0 {
233 data.message_count as f64 / elapsed
234 } else {
235 0.0
236 };
237 let bytes_per_sec = if elapsed > 0.0 {
238 data.byte_count as f64 / elapsed
239 } else {
240 0.0
241 };
242
243 ThroughputStats {
244 topic: topic.to_string(),
245 message_count: data.message_count,
246 byte_count: data.byte_count,
247 duration_sec: elapsed,
248 messages_per_sec,
249 bytes_per_sec,
250 megabytes_per_sec: bytes_per_sec / 1_000_000.0,
251 }
252 })
253 }
254
255 fn get_all_stats(&self) -> Vec<ThroughputStats> {
256 self.data.keys().filter_map(|topic| self.get_stats(topic)).collect()
257 }
258}
259
260struct ExecutionTracker {
262 samples: std::collections::HashMap<String, SampleBuffer>,
263}
264
265impl ExecutionTracker {
266 fn new() -> Self {
267 Self {
268 samples: std::collections::HashMap::new(),
269 }
270 }
271
272 fn record(&mut self, name: &str, duration_us: u64) {
273 self.samples
274 .entry(name.to_string())
275 .or_insert_with(SampleBuffer::new)
276 .add(duration_us);
277 }
278
279 fn get_stats(&self, name: &str) -> Option<ExecutionStats> {
280 self.samples.get(name).map(|buffer| ExecutionStats {
281 name: name.to_string(),
282 count: buffer.count(),
283 min_us: buffer.min(),
284 max_us: buffer.max(),
285 mean_us: buffer.mean(),
286 p50_us: buffer.percentile(0.50),
287 p95_us: buffer.percentile(0.95),
288 p99_us: buffer.percentile(0.99),
289 })
290 }
291
292 fn get_all_stats(&self) -> Vec<ExecutionStats> {
293 self.samples.keys().filter_map(|name| self.get_stats(name)).collect()
294 }
295}
296
297struct MemoryTracker {
299 total_allocated: usize,
300 total_freed: usize,
301 current_usage: usize,
302 peak_usage: usize,
303}
304
305impl MemoryTracker {
306 fn new() -> Self {
307 Self {
308 total_allocated: 0,
309 total_freed: 0,
310 current_usage: 0,
311 peak_usage: 0,
312 }
313 }
314
315 fn record(&mut self, allocated: usize, freed: usize) {
316 self.total_allocated += allocated;
317 self.total_freed += freed;
318 self.current_usage = self.current_usage.saturating_add(allocated).saturating_sub(freed);
319
320 if self.current_usage > self.peak_usage {
321 self.peak_usage = self.current_usage;
322 }
323 }
324
325 fn get_stats(&self) -> MemoryStats {
326 MemoryStats {
327 total_allocated_bytes: self.total_allocated,
328 total_freed_bytes: self.total_freed,
329 current_usage_bytes: self.current_usage,
330 peak_usage_bytes: self.peak_usage,
331 current_usage_mb: self.current_usage as f64 / 1_000_000.0,
332 peak_usage_mb: self.peak_usage as f64 / 1_000_000.0,
333 }
334 }
335}
336
337struct SampleBuffer {
339 samples: VecDeque<u64>,
340 max_samples: usize,
341}
342
343impl SampleBuffer {
344 fn new() -> Self {
345 Self {
346 samples: VecDeque::new(),
347 max_samples: 10000, }
349 }
350
351 fn add(&mut self, value: u64) {
352 if self.samples.len() >= self.max_samples {
353 self.samples.pop_front();
354 }
355 self.samples.push_back(value);
356 }
357
358 fn count(&self) -> u64 {
359 self.samples.len() as u64
360 }
361
362 fn min(&self) -> u64 {
363 self.samples.iter().min().copied().unwrap_or(0)
364 }
365
366 fn max(&self) -> u64 {
367 self.samples.iter().max().copied().unwrap_or(0)
368 }
369
370 fn mean(&self) -> f64 {
371 if self.samples.is_empty() {
372 return 0.0;
373 }
374 let sum: u64 = self.samples.iter().sum();
375 sum as f64 / self.samples.len() as f64
376 }
377
378 fn percentile(&self, p: f64) -> u64 {
379 if self.samples.is_empty() {
380 return 0;
381 }
382
383 let mut sorted: Vec<u64> = self.samples.iter().copied().collect();
384 sorted.sort_unstable();
385
386 let idx = ((sorted.len() as f64 - 1.0) * p) as usize;
387 sorted[idx]
388 }
389}
390
391#[derive(Debug, Clone, Serialize, Deserialize)]
393pub struct LatencyStats {
394 pub topic: String,
395 pub count: u64,
396 pub min_us: u64,
397 pub max_us: u64,
398 pub mean_us: f64,
399 pub p50_us: u64,
400 pub p95_us: u64,
401 pub p99_us: u64,
402}
403
404impl Message for LatencyStats {}
405
406#[derive(Debug, Clone, Serialize, Deserialize)]
408pub struct ThroughputStats {
409 pub topic: String,
410 pub message_count: u64,
411 pub byte_count: u64,
412 pub duration_sec: f64,
413 pub messages_per_sec: f64,
414 pub bytes_per_sec: f64,
415 pub megabytes_per_sec: f64,
416}
417
418impl Message for ThroughputStats {}
419
420#[derive(Debug, Clone, Serialize, Deserialize)]
422pub struct ExecutionStats {
423 pub name: String,
424 pub count: u64,
425 pub min_us: u64,
426 pub max_us: u64,
427 pub mean_us: f64,
428 pub p50_us: u64,
429 pub p95_us: u64,
430 pub p99_us: u64,
431}
432
433impl Message for ExecutionStats {}
434
435#[derive(Debug, Clone, Serialize, Deserialize)]
437pub struct MemoryStats {
438 pub total_allocated_bytes: usize,
439 pub total_freed_bytes: usize,
440 pub current_usage_bytes: usize,
441 pub peak_usage_bytes: usize,
442 pub current_usage_mb: f64,
443 pub peak_usage_mb: f64,
444}
445
446impl Message for MemoryStats {}
447
448#[derive(Debug, Clone, Serialize, Deserialize)]
450pub struct PerformanceReport {
451 pub latency: Vec<LatencyStats>,
452 pub throughput: Vec<ThroughputStats>,
453 pub execution: Vec<ExecutionStats>,
454 pub memory: MemoryStats,
455 pub timestamp: u64,
456}
457
458impl Message for PerformanceReport {}
459
460pub fn measure_latency(timestamp_us: u64) -> u64 {
462 let now = now_micros();
463 now.saturating_sub(timestamp_us)
464}
465
466pub trait ProfilingExt {
468 fn metrics(&self) -> &PerformanceMetrics;
470
471 #[allow(async_fn_in_trait)]
473 async fn publish_profiled<T: Message + Serialize>(&self, topic: &str, message: &T) -> Result<()>;
474
475 #[allow(async_fn_in_trait)]
477 async fn subscribe_profiled<T: Message + DeserializeOwned + Send + 'static>(
478 &self,
479 topic: &str,
480 ) -> Result<Receiver<T>>;
481}
482
483