rs2_stream/
stream_performance_metrics.rs1use std::time::{Duration, Instant};
7
8#[derive(Debug, Clone)]
10pub struct BufferConfig {
11 pub initial_capacity: usize,
12 pub max_capacity: Option<usize>,
13 pub growth_strategy: GrowthStrategy,
14}
15
16impl Default for BufferConfig {
17 fn default() -> Self {
18 Self {
19 initial_capacity: 8192,
20 max_capacity: Some(1048576),
21 growth_strategy: GrowthStrategy::Exponential(1.5),
22 }
23 }
24}
25
26#[derive(Debug, Clone)]
28pub enum GrowthStrategy {
29 Linear(usize),
31 Exponential(f64),
33 Fixed,
35}
36
37#[derive(Debug, Clone)]
38pub struct HealthThresholds {
39 pub max_error_rate: f64,
40 pub max_consecutive_errors: u64,
41}
42
43impl Default for HealthThresholds {
44 fn default() -> Self {
45 Self {
46 max_error_rate: 0.1, max_consecutive_errors: 5, }
49 }
50}
51
52impl HealthThresholds {
53 pub fn strict() -> Self {
55 Self {
56 max_error_rate: 0.01, max_consecutive_errors: 2, }
59 }
60
61 pub fn relaxed() -> Self {
63 Self {
64 max_error_rate: 0.20, max_consecutive_errors: 20, }
67 }
68
69 pub fn custom(max_error_rate: f64, max_consecutive_errors: u64) -> Self {
71 Self {
72 max_error_rate,
73 max_consecutive_errors,
74 }
75 }
76}
77
78#[derive(Debug, Clone, Default)]
80pub struct StreamMetrics {
81 pub name: Option<String>,
82 pub items_processed: u64,
83 pub bytes_processed: u64,
84 pub processing_time: Duration,
85 pub errors: u64,
86 pub start_time: Option<Instant>,
87
88 pub retries: u64,
89 pub items_per_second: f64,
90 pub bytes_per_second: f64,
91 pub average_item_size: f64,
92 pub peak_processing_time: Duration,
93 pub last_activity: Option<Instant>,
94 pub consecutive_errors: u64,
95 pub error_rate: f64,
96 pub backpressure_events: u64,
97 pub queue_depth: u64,
98 pub health_thresholds: HealthThresholds,
99}
100
101impl StreamMetrics {
102 pub fn new() -> Self {
103 let timestamp = std::time::SystemTime::now()
104 .duration_since(std::time::UNIX_EPOCH)
105 .unwrap()
106 .as_millis();
107
108 Self {
109 name: Some(format!("rs2-stream-{}", timestamp)),
110 start_time: Some(Instant::now()),
111 ..Default::default()
112 }
113 }
114
115 pub fn with_name(mut self, name: String) -> Self {
116 self.name = Some(name);
117 self
118 }
119 pub fn set_name(&mut self, name: String) {
120 self.name = Some(name);
121 }
122
123 pub fn with_health_thresholds(mut self, thresholds: HealthThresholds) -> Self {
124 self.health_thresholds = thresholds;
125 self
126 }
127
128 pub fn set_health_thresholds(&mut self, thresholds: HealthThresholds) {
129 self.health_thresholds = thresholds;
130 }
131
132 pub fn record_item(&mut self, size_bytes: u64) {
133 self.items_processed += 1;
134 self.bytes_processed += size_bytes;
135 self.last_activity = Some(Instant::now());
136 self.consecutive_errors = 0;
137 self.update_derived_metrics();
138 }
139
140 pub fn record_error(&mut self) {
141 self.errors += 1;
142 self.consecutive_errors += 1;
143 self.last_activity = Some(Instant::now());
144 self.update_derived_metrics();
145 }
146
147 pub fn record_retry(&mut self) {
148 self.retries += 1;
149 }
150
151 pub fn record_processing_time(&mut self, duration: Duration) {
152 self.processing_time += duration;
153 if duration > self.peak_processing_time {
154 self.peak_processing_time = duration;
155 }
156 }
157
158 pub fn record_backpressure(&mut self) {
159 self.backpressure_events += 1;
160 }
161
162 pub fn update_queue_depth(&mut self, depth: u64) {
163 self.queue_depth = depth;
164 }
165
166 pub fn finalize(&mut self) {
167 if let Some(start) = self.start_time.take() {
168 self.processing_time = start.elapsed();
169 }
170 self.update_derived_metrics();
171 }
172
173 pub fn throughput_items_per_sec(&self) -> f64 {
174 if self.processing_time.as_secs_f64() > 0.0 {
175 self.items_processed as f64 / self.processing_time.as_secs_f64()
176 } else {
177 0.0
178 }
179 }
180
181 pub fn throughput_bytes_per_sec(&self) -> f64 {
182 if self.processing_time.as_secs_f64() > 0.0 {
183 self.bytes_processed as f64 / self.processing_time.as_secs_f64()
184 } else {
185 0.0
186 }
187 }
188
189 pub fn update_derived_metrics(&mut self) {
190 if let Some(start) = self.start_time {
191 let elapsed_secs = start.elapsed().as_secs_f64();
192 if elapsed_secs > 0.0 {
193 self.items_per_second = self.items_processed as f64 / elapsed_secs;
194 self.bytes_per_second = self.bytes_processed as f64 / elapsed_secs;
195 }
196 }
197
198 if self.items_processed > 0 {
199 self.average_item_size = self.bytes_processed as f64 / self.items_processed as f64;
200 }
201
202 let total_attempts = self.items_processed + self.errors;
203 if total_attempts > 0 {
204 self.error_rate = self.errors as f64 / total_attempts as f64;
205 }
206 }
207
208 pub fn is_healthy(&self) -> bool {
209 self.error_rate < self.health_thresholds.max_error_rate
210 && self.consecutive_errors < self.health_thresholds.max_consecutive_errors
211 }
212
213 pub fn throughput_summary(&self) -> String {
214 format!(
215 "{:.1} items/sec, {:.1} KB/sec",
216 self.items_per_second,
217 self.bytes_per_second / 1000.0
218 )
219 }
220
221 pub fn throughput_summary_processing_time(&self) -> String {
222 format!(
223 "{:.1} items/sec, {:.1} KB/sec",
224 self.throughput_items_per_sec(),
225 self.throughput_bytes_per_sec() / 1000.0
226 )
227 }
228}