rs2_stream/
stream_performance_metrics.rs1use std::time::{Duration, Instant};
7
8#[derive(Debug, Clone)]
10pub struct BufferConfig {
11 pub initial_capacity: usize,
12 pub max_capacity: Option<usize>,
13 pub growth_strategy: GrowthStrategy,
14}
15
16impl Default for BufferConfig {
17 fn default() -> Self {
18 Self {
19 initial_capacity: 8192,
20 max_capacity: Some(1048576),
21 growth_strategy: GrowthStrategy::Exponential(1.5),
22 }
23 }
24}
25
26#[derive(Debug, Clone)]
28pub enum GrowthStrategy {
29 Linear(usize),
31 Exponential(f64),
33 Fixed,
35}
36
37#[derive(Debug, Clone, Default)]
39pub struct StreamMetrics {
40 pub items_processed: u64,
41 pub bytes_processed: u64,
42 pub processing_time: Duration,
43 pub errors: u64,
44 pub start_time: Option<Instant>,
45}
46
47impl StreamMetrics {
48 pub fn new() -> Self {
49 Self {
50 start_time: Some(Instant::now()),
51 ..Default::default()
52 }
53 }
54
55 pub fn record_item(&mut self, size_bytes: u64) {
56 self.items_processed += 1;
57 self.bytes_processed += size_bytes;
58 }
59
60 pub fn record_error(&mut self) {
61 self.errors += 1;
62 }
63
64 pub fn finalize(&mut self) {
65 if let Some(start) = self.start_time.take() {
66 self.processing_time = start.elapsed();
67 }
68 }
69
70 pub fn throughput_items_per_sec(&self) -> f64 {
71 if self.processing_time.as_secs_f64() > 0.0 {
72 self.items_processed as f64 / self.processing_time.as_secs_f64()
73 } else {
74 0.0
75 }
76 }
77
78 pub fn throughput_bytes_per_sec(&self) -> f64 {
79 if self.processing_time.as_secs_f64() > 0.0 {
80 self.bytes_processed as f64 / self.processing_time.as_secs_f64()
81 } else {
82 0.0
83 }
84 }
85}