rs2_stream/
stream_performance_metrics.rs

1//! Performance optimization utilities for RStream
2//!
3//! This module provides buffering strategies, metrics collection,
4//! and performance monitoring tools.
5
6use std::time::{Duration, Instant};
7
8/// Buffer configuration for optimal performance
9#[derive(Debug, Clone)]
10pub struct BufferConfig {
11    pub initial_capacity: usize,
12    pub max_capacity: Option<usize>,
13    pub growth_strategy: GrowthStrategy,
14}
15
16impl Default for BufferConfig {
17    fn default() -> Self {
18        Self {
19            initial_capacity: 8192,
20            max_capacity: Some(1048576),
21            growth_strategy: GrowthStrategy::Exponential(1.5),
22        }
23    }
24}
25
26/// Strategy for growing buffers
27#[derive(Debug, Clone)]
28pub enum GrowthStrategy {
29    /// Grow linearly by fixed amount
30    Linear(usize),
31    /// Grow exponentially by multiplier
32    Exponential(f64),
33    /// Fixed size, don't grow
34    Fixed,
35}
36
37
38#[derive(Debug, Clone)]
39pub struct HealthThresholds {
40    pub max_error_rate: f64,
41    pub max_consecutive_errors: u64,
42}
43
44impl Default for HealthThresholds {
45    fn default() -> Self {
46        Self {
47            max_error_rate: 0.1,        // 10% error rate
48            max_consecutive_errors: 5,   // 5 consecutive errors
49        }
50    }
51}
52
53impl HealthThresholds {
54    /// Conservative thresholds for critical systems
55    pub fn strict() -> Self {
56        Self {
57            max_error_rate: 0.01,      // 1% error rate
58            max_consecutive_errors: 2,  // 2 consecutive errors
59        }
60    }
61
62    /// Permissive thresholds for high-throughput systems
63    pub fn relaxed() -> Self {
64        Self {
65            max_error_rate: 0.20,      // 20% error rate
66            max_consecutive_errors: 20, // 20 consecutive errors
67        }
68    }
69
70    /// Custom thresholds
71    pub fn custom(max_error_rate: f64, max_consecutive_errors: u64) -> Self {
72        Self {
73            max_error_rate,
74            max_consecutive_errors,
75        }
76    }
77}
78
79/// Metrics collected for rs2_stream operations
80#[derive(Debug, Clone, Default)]
81pub struct StreamMetrics {
82    pub name: Option<String>,
83    pub items_processed: u64,
84    pub bytes_processed: u64,
85    pub processing_time: Duration,
86    pub errors: u64,
87    pub start_time: Option<Instant>,
88
89    pub retries: u64,
90    pub items_per_second: f64,
91    pub bytes_per_second: f64,
92    pub average_item_size: f64,
93    pub peak_processing_time: Duration,
94    pub last_activity: Option<Instant>,
95    pub consecutive_errors: u64,
96    pub error_rate: f64,
97    pub backpressure_events: u64,
98    pub queue_depth: u64,
99    pub health_thresholds: HealthThresholds
100}
101
102impl StreamMetrics {
103    pub fn new() -> Self {
104        let timestamp = std::time::SystemTime::now()
105            .duration_since(std::time::UNIX_EPOCH)
106            .unwrap()
107            .as_millis();
108
109        Self {
110            name: Some(format!("rs2-stream-{}", timestamp)),
111            start_time: Some(Instant::now()),
112            ..Default::default()
113        }
114    }
115
116    pub fn with_name(mut self, name: String) -> Self {
117        self.name = Some(name);
118        self
119    }
120    pub fn set_name(&mut self, name: String) {
121        self.name = Some(name);
122    }
123
124    pub fn with_health_thresholds(mut self, thresholds: HealthThresholds) -> Self {
125        self.health_thresholds = thresholds;
126        self
127    }
128
129    pub fn set_health_thresholds(&mut self, thresholds: HealthThresholds) {
130        self.health_thresholds = thresholds;
131    }
132
133    pub fn record_item(&mut self, size_bytes: u64) {
134        self.items_processed += 1;
135        self.bytes_processed += size_bytes;
136        self.last_activity = Some(Instant::now());
137        self.consecutive_errors = 0;
138        self.update_derived_metrics();
139
140    }
141
142    pub fn record_error(&mut self) {
143        
144        self.errors += 1;
145        self.consecutive_errors += 1;
146        self.last_activity = Some(Instant::now());
147        self.update_derived_metrics();
148
149    }
150
151    pub fn record_retry(&mut self) {
152        self.retries += 1;
153    }
154
155    pub fn record_processing_time(&mut self, duration: Duration) {
156        self.processing_time += duration;
157        if duration > self.peak_processing_time {
158            self.peak_processing_time = duration;
159        }
160    }
161
162    pub fn record_backpressure(&mut self) {
163        self.backpressure_events += 1;
164    }
165
166    pub fn update_queue_depth(&mut self, depth: u64) {
167        self.queue_depth = depth;
168    }
169    
170    pub fn finalize(&mut self) {
171        if let Some(start) = self.start_time.take() {
172            self.processing_time = start.elapsed();
173        }
174        self.update_derived_metrics();
175
176    }
177    
178    pub fn throughput_items_per_sec(&self) -> f64 {
179        if self.processing_time.as_secs_f64() > 0.0 {
180            self.items_processed as f64 / self.processing_time.as_secs_f64()
181        } else {
182            0.0
183        }
184    }
185
186    pub fn throughput_bytes_per_sec(&self) -> f64 {
187        if self.processing_time.as_secs_f64() > 0.0 {
188            self.bytes_processed as f64 / self.processing_time.as_secs_f64()
189        } else {
190            0.0
191        }
192    }
193
194    pub fn update_derived_metrics(&mut self) {
195        if let Some(start) = self.start_time {
196            let elapsed_secs = start.elapsed().as_secs_f64();
197            if elapsed_secs > 0.0 {
198                self.items_per_second = self.items_processed as f64 / elapsed_secs;
199                self.bytes_per_second = self.bytes_processed as f64 / elapsed_secs;
200            }
201        }
202
203        if self.items_processed > 0 {
204            self.average_item_size = self.bytes_processed as f64 / self.items_processed as f64;
205        }
206
207        let total_attempts = self.items_processed + self.errors;
208        if total_attempts > 0 {
209            self.error_rate = self.errors as f64 / total_attempts as f64;
210        }
211    }
212
213    pub fn is_healthy(&self) -> bool {
214        self.error_rate < self.health_thresholds.max_error_rate
215            && self.consecutive_errors < self.health_thresholds.max_consecutive_errors
216
217    }
218
219    pub fn throughput_summary(&self) -> String {
220        format!("{:.1} items/sec, {:.1} KB/sec",
221                self.items_per_second,
222                self.bytes_per_second / 1000.0)
223    }
224
225    pub fn throughput_summary_processing_time(&self) -> String {
226        format!("{:.1} items/sec, {:.1} KB/sec",
227                self.throughput_items_per_sec(),
228                self.throughput_bytes_per_sec() / 1000.0)
229    }
230
231}