rs2_stream/
stream_performance_metrics.rs1use std::time::{Duration, Instant};
7
8#[derive(Debug, Clone)]
10pub struct BufferConfig {
11 pub initial_capacity: usize,
12 pub max_capacity: Option<usize>,
13 pub growth_strategy: GrowthStrategy,
14}
15
16impl Default for BufferConfig {
17 fn default() -> Self {
18 Self {
19 initial_capacity: 8192,
20 max_capacity: Some(1048576),
21 growth_strategy: GrowthStrategy::Exponential(1.5),
22 }
23 }
24}
25
26#[derive(Debug, Clone)]
28pub enum GrowthStrategy {
29 Linear(usize),
31 Exponential(f64),
33 Fixed,
35}
36
37
38#[derive(Debug, Clone)]
39pub struct HealthThresholds {
40 pub max_error_rate: f64,
41 pub max_consecutive_errors: u64,
42}
43
44impl Default for HealthThresholds {
45 fn default() -> Self {
46 Self {
47 max_error_rate: 0.1, max_consecutive_errors: 5, }
50 }
51}
52
53impl HealthThresholds {
54 pub fn strict() -> Self {
56 Self {
57 max_error_rate: 0.01, max_consecutive_errors: 2, }
60 }
61
62 pub fn relaxed() -> Self {
64 Self {
65 max_error_rate: 0.20, max_consecutive_errors: 20, }
68 }
69
70 pub fn custom(max_error_rate: f64, max_consecutive_errors: u64) -> Self {
72 Self {
73 max_error_rate,
74 max_consecutive_errors,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Default)]
81pub struct StreamMetrics {
82 pub name: Option<String>,
83 pub items_processed: u64,
84 pub bytes_processed: u64,
85 pub processing_time: Duration,
86 pub errors: u64,
87 pub start_time: Option<Instant>,
88
89 pub retries: u64,
90 pub items_per_second: f64,
91 pub bytes_per_second: f64,
92 pub average_item_size: f64,
93 pub peak_processing_time: Duration,
94 pub last_activity: Option<Instant>,
95 pub consecutive_errors: u64,
96 pub error_rate: f64,
97 pub backpressure_events: u64,
98 pub queue_depth: u64,
99 pub health_thresholds: HealthThresholds
100}
101
102impl StreamMetrics {
103 pub fn new() -> Self {
104 let timestamp = std::time::SystemTime::now()
105 .duration_since(std::time::UNIX_EPOCH)
106 .unwrap()
107 .as_millis();
108
109 Self {
110 name: Some(format!("rs2-stream-{}", timestamp)),
111 start_time: Some(Instant::now()),
112 ..Default::default()
113 }
114 }
115
116 pub fn with_name(mut self, name: String) -> Self {
117 self.name = Some(name);
118 self
119 }
120 pub fn set_name(&mut self, name: String) {
121 self.name = Some(name);
122 }
123
124 pub fn with_health_thresholds(mut self, thresholds: HealthThresholds) -> Self {
125 self.health_thresholds = thresholds;
126 self
127 }
128
129 pub fn set_health_thresholds(&mut self, thresholds: HealthThresholds) {
130 self.health_thresholds = thresholds;
131 }
132
133 pub fn record_item(&mut self, size_bytes: u64) {
134 self.items_processed += 1;
135 self.bytes_processed += size_bytes;
136 self.last_activity = Some(Instant::now());
137 self.consecutive_errors = 0;
138 self.update_derived_metrics();
139
140 }
141
142 pub fn record_error(&mut self) {
143
144 self.errors += 1;
145 self.consecutive_errors += 1;
146 self.last_activity = Some(Instant::now());
147 self.update_derived_metrics();
148
149 }
150
151 pub fn record_retry(&mut self) {
152 self.retries += 1;
153 }
154
155 pub fn record_processing_time(&mut self, duration: Duration) {
156 self.processing_time += duration;
157 if duration > self.peak_processing_time {
158 self.peak_processing_time = duration;
159 }
160 }
161
162 pub fn record_backpressure(&mut self) {
163 self.backpressure_events += 1;
164 }
165
166 pub fn update_queue_depth(&mut self, depth: u64) {
167 self.queue_depth = depth;
168 }
169
170 pub fn finalize(&mut self) {
171 if let Some(start) = self.start_time.take() {
172 self.processing_time = start.elapsed();
173 }
174 self.update_derived_metrics();
175
176 }
177
178 pub fn throughput_items_per_sec(&self) -> f64 {
179 if self.processing_time.as_secs_f64() > 0.0 {
180 self.items_processed as f64 / self.processing_time.as_secs_f64()
181 } else {
182 0.0
183 }
184 }
185
186 pub fn throughput_bytes_per_sec(&self) -> f64 {
187 if self.processing_time.as_secs_f64() > 0.0 {
188 self.bytes_processed as f64 / self.processing_time.as_secs_f64()
189 } else {
190 0.0
191 }
192 }
193
194 pub fn update_derived_metrics(&mut self) {
195 if let Some(start) = self.start_time {
196 let elapsed_secs = start.elapsed().as_secs_f64();
197 if elapsed_secs > 0.0 {
198 self.items_per_second = self.items_processed as f64 / elapsed_secs;
199 self.bytes_per_second = self.bytes_processed as f64 / elapsed_secs;
200 }
201 }
202
203 if self.items_processed > 0 {
204 self.average_item_size = self.bytes_processed as f64 / self.items_processed as f64;
205 }
206
207 let total_attempts = self.items_processed + self.errors;
208 if total_attempts > 0 {
209 self.error_rate = self.errors as f64 / total_attempts as f64;
210 }
211 }
212
213 pub fn is_healthy(&self) -> bool {
214 self.error_rate < self.health_thresholds.max_error_rate
215 && self.consecutive_errors < self.health_thresholds.max_consecutive_errors
216
217 }
218
219 pub fn throughput_summary(&self) -> String {
220 format!("{:.1} items/sec, {:.1} KB/sec",
221 self.items_per_second,
222 self.bytes_per_second / 1000.0)
223 }
224
225 pub fn throughput_summary_processing_time(&self) -> String {
226 format!("{:.1} items/sec, {:.1} KB/sec",
227 self.throughput_items_per_sec(),
228 self.throughput_bytes_per_sec() / 1000.0)
229 }
230
231}