rs2_stream/
stream_performance_metrics.rs

1//! Performance optimization utilities for RStream
2//!
3//! This module provides buffering strategies, metrics collection,
4//! and performance monitoring tools.
5
6use std::time::{Duration, Instant};
7
8/// Buffer configuration for optimal performance
9#[derive(Debug, Clone)]
10pub struct BufferConfig {
11    pub initial_capacity: usize,
12    pub max_capacity: Option<usize>,
13    pub growth_strategy: GrowthStrategy,
14}
15
16impl Default for BufferConfig {
17    fn default() -> Self {
18        Self {
19            initial_capacity: 8192,
20            max_capacity: Some(1048576),
21            growth_strategy: GrowthStrategy::Exponential(1.5),
22        }
23    }
24}
25
26/// Strategy for growing buffers
27#[derive(Debug, Clone)]
28pub enum GrowthStrategy {
29    /// Grow linearly by fixed amount
30    Linear(usize),
31    /// Grow exponentially by multiplier
32    Exponential(f64),
33    /// Fixed size, don't grow
34    Fixed,
35}
36
37#[derive(Debug, Clone)]
38pub struct HealthThresholds {
39    pub max_error_rate: f64,
40    pub max_consecutive_errors: u64,
41}
42
43impl Default for HealthThresholds {
44    fn default() -> Self {
45        Self {
46            max_error_rate: 0.1,       // 10% error rate
47            max_consecutive_errors: 5, // 5 consecutive errors
48        }
49    }
50}
51
52impl HealthThresholds {
53    /// Conservative thresholds for critical systems
54    pub fn strict() -> Self {
55        Self {
56            max_error_rate: 0.01,      // 1% error rate
57            max_consecutive_errors: 2, // 2 consecutive errors
58        }
59    }
60
61    /// Permissive thresholds for high-throughput systems
62    pub fn relaxed() -> Self {
63        Self {
64            max_error_rate: 0.20,       // 20% error rate
65            max_consecutive_errors: 20, // 20 consecutive errors
66        }
67    }
68
69    /// Custom thresholds
70    pub fn custom(max_error_rate: f64, max_consecutive_errors: u64) -> Self {
71        Self {
72            max_error_rate,
73            max_consecutive_errors,
74        }
75    }
76}
77
78/// Metrics collected for rs2_stream operations
79#[derive(Debug, Clone, Default)]
80pub struct StreamMetrics {
81    pub name: Option<String>,
82    pub items_processed: u64,
83    pub bytes_processed: u64,
84    pub processing_time: Duration,
85    pub errors: u64,
86    pub start_time: Option<Instant>,
87
88    pub retries: u64,
89    pub items_per_second: f64,
90    pub bytes_per_second: f64,
91    pub average_item_size: f64,
92    pub peak_processing_time: Duration,
93    pub last_activity: Option<Instant>,
94    pub consecutive_errors: u64,
95    pub error_rate: f64,
96    pub backpressure_events: u64,
97    pub queue_depth: u64,
98    pub health_thresholds: HealthThresholds,
99}
100
101impl StreamMetrics {
102    pub fn new() -> Self {
103        let timestamp = std::time::SystemTime::now()
104            .duration_since(std::time::UNIX_EPOCH)
105            .unwrap()
106            .as_millis();
107
108        Self {
109            name: Some(format!("rs2-stream-{}", timestamp)),
110            start_time: Some(Instant::now()),
111            ..Default::default()
112        }
113    }
114
115    pub fn with_name(mut self, name: String) -> Self {
116        self.name = Some(name);
117        self
118    }
119    pub fn set_name(&mut self, name: String) {
120        self.name = Some(name);
121    }
122
123    pub fn with_health_thresholds(mut self, thresholds: HealthThresholds) -> Self {
124        self.health_thresholds = thresholds;
125        self
126    }
127
128    pub fn set_health_thresholds(&mut self, thresholds: HealthThresholds) {
129        self.health_thresholds = thresholds;
130    }
131
132    pub fn record_item(&mut self, size_bytes: u64) {
133        self.items_processed += 1;
134        self.bytes_processed += size_bytes;
135        self.last_activity = Some(Instant::now());
136        self.consecutive_errors = 0;
137        self.update_derived_metrics();
138    }
139
140    pub fn record_error(&mut self) {
141        self.errors += 1;
142        self.consecutive_errors += 1;
143        self.last_activity = Some(Instant::now());
144        self.update_derived_metrics();
145    }
146
147    pub fn record_retry(&mut self) {
148        self.retries += 1;
149    }
150
151    pub fn record_processing_time(&mut self, duration: Duration) {
152        self.processing_time += duration;
153        if duration > self.peak_processing_time {
154            self.peak_processing_time = duration;
155        }
156    }
157
158    pub fn record_backpressure(&mut self) {
159        self.backpressure_events += 1;
160    }
161
162    pub fn update_queue_depth(&mut self, depth: u64) {
163        self.queue_depth = depth;
164    }
165
166    pub fn finalize(&mut self) {
167        if let Some(start) = self.start_time.take() {
168            self.processing_time = start.elapsed();
169        }
170        self.update_derived_metrics();
171    }
172
173    pub fn throughput_items_per_sec(&self) -> f64 {
174        if self.processing_time.as_secs_f64() > 0.0 {
175            self.items_processed as f64 / self.processing_time.as_secs_f64()
176        } else {
177            0.0
178        }
179    }
180
181    pub fn throughput_bytes_per_sec(&self) -> f64 {
182        if self.processing_time.as_secs_f64() > 0.0 {
183            self.bytes_processed as f64 / self.processing_time.as_secs_f64()
184        } else {
185            0.0
186        }
187    }
188
189    pub fn update_derived_metrics(&mut self) {
190        if let Some(start) = self.start_time {
191            let elapsed_secs = start.elapsed().as_secs_f64();
192            if elapsed_secs > 0.0 {
193                self.items_per_second = self.items_processed as f64 / elapsed_secs;
194                self.bytes_per_second = self.bytes_processed as f64 / elapsed_secs;
195            }
196        }
197
198        if self.items_processed > 0 {
199            self.average_item_size = self.bytes_processed as f64 / self.items_processed as f64;
200        }
201
202        let total_attempts = self.items_processed + self.errors;
203        if total_attempts > 0 {
204            self.error_rate = self.errors as f64 / total_attempts as f64;
205        }
206    }
207
208    pub fn is_healthy(&self) -> bool {
209        self.error_rate < self.health_thresholds.max_error_rate
210            && self.consecutive_errors < self.health_thresholds.max_consecutive_errors
211    }
212
213    pub fn throughput_summary(&self) -> String {
214        format!(
215            "{:.1} items/sec, {:.1} KB/sec",
216            self.items_per_second,
217            self.bytes_per_second / 1000.0
218        )
219    }
220
221    pub fn throughput_summary_processing_time(&self) -> String {
222        format!(
223            "{:.1} items/sec, {:.1} KB/sec",
224            self.throughput_items_per_sec(),
225            self.throughput_bytes_per_sec() / 1000.0
226        )
227    }
228}