use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct BufferConfig {
pub initial_capacity: usize,
pub max_capacity: Option<usize>,
pub growth_strategy: GrowthStrategy,
}
impl Default for BufferConfig {
fn default() -> Self {
Self {
initial_capacity: 8192,
max_capacity: Some(1048576),
growth_strategy: GrowthStrategy::Exponential(1.5),
}
}
}
#[derive(Debug, Clone)]
pub enum GrowthStrategy {
Linear(usize),
Exponential(f64),
Fixed,
}
#[derive(Debug, Clone)]
pub struct HealthThresholds {
pub max_error_rate: f64,
pub max_consecutive_errors: u64,
}
impl Default for HealthThresholds {
fn default() -> Self {
Self {
max_error_rate: 0.1, max_consecutive_errors: 5, }
}
}
impl HealthThresholds {
pub fn strict() -> Self {
Self {
max_error_rate: 0.01, max_consecutive_errors: 2, }
}
pub fn relaxed() -> Self {
Self {
max_error_rate: 0.20, max_consecutive_errors: 20, }
}
pub fn custom(max_error_rate: f64, max_consecutive_errors: u64) -> Self {
Self {
max_error_rate,
max_consecutive_errors,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamMetrics {
pub name: Option<String>,
pub items_processed: u64,
pub bytes_processed: u64,
pub processing_time: Duration,
pub errors: u64,
pub start_time: Option<Instant>,
pub retries: u64,
pub items_per_second: f64,
pub bytes_per_second: f64,
pub average_item_size: f64,
pub peak_processing_time: Duration,
pub last_activity: Option<Instant>,
pub consecutive_errors: u64,
pub error_rate: f64,
pub backpressure_events: u64,
pub queue_depth: u64,
pub health_thresholds: HealthThresholds,
}
impl StreamMetrics {
pub fn new() -> Self {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
Self {
name: Some(format!("rs2-stream-{}", timestamp)),
start_time: Some(Instant::now()),
..Default::default()
}
}
pub fn with_name(mut self, name: String) -> Self {
self.name = Some(name);
self
}
pub fn set_name(&mut self, name: String) {
self.name = Some(name);
}
pub fn with_health_thresholds(mut self, thresholds: HealthThresholds) -> Self {
self.health_thresholds = thresholds;
self
}
pub fn set_health_thresholds(&mut self, thresholds: HealthThresholds) {
self.health_thresholds = thresholds;
}
pub fn record_item(&mut self, size_bytes: u64) {
self.items_processed += 1;
self.bytes_processed += size_bytes;
self.last_activity = Some(Instant::now());
self.consecutive_errors = 0;
self.update_derived_metrics();
}
pub fn record_error(&mut self) {
self.errors += 1;
self.consecutive_errors += 1;
self.last_activity = Some(Instant::now());
self.update_derived_metrics();
}
pub fn record_retry(&mut self) {
self.retries += 1;
}
pub fn record_processing_time(&mut self, duration: Duration) {
self.processing_time += duration;
if duration > self.peak_processing_time {
self.peak_processing_time = duration;
}
}
pub fn record_backpressure(&mut self) {
self.backpressure_events += 1;
}
pub fn update_queue_depth(&mut self, depth: u64) {
self.queue_depth = depth;
}
pub fn finalize(&mut self) {
if let Some(start) = self.start_time.take() {
self.processing_time = start.elapsed();
}
self.update_derived_metrics();
}
pub fn throughput_items_per_sec(&self) -> f64 {
if self.processing_time.as_secs_f64() > 0.0 {
self.items_processed as f64 / self.processing_time.as_secs_f64()
} else {
0.0
}
}
pub fn throughput_bytes_per_sec(&self) -> f64 {
if self.processing_time.as_secs_f64() > 0.0 {
self.bytes_processed as f64 / self.processing_time.as_secs_f64()
} else {
0.0
}
}
pub fn update_derived_metrics(&mut self) {
if let Some(start) = self.start_time {
let elapsed_secs = start.elapsed().as_secs_f64();
if elapsed_secs > 0.0 {
self.items_per_second = self.items_processed as f64 / elapsed_secs;
self.bytes_per_second = self.bytes_processed as f64 / elapsed_secs;
}
}
if self.items_processed > 0 {
self.average_item_size = self.bytes_processed as f64 / self.items_processed as f64;
}
let total_attempts = self.items_processed + self.errors;
if total_attempts > 0 {
self.error_rate = self.errors as f64 / total_attempts as f64;
}
}
pub fn is_healthy(&self) -> bool {
self.error_rate < self.health_thresholds.max_error_rate
&& self.consecutive_errors < self.health_thresholds.max_consecutive_errors
}
pub fn throughput_summary(&self) -> String {
format!(
"{:.1} items/sec, {:.1} KB/sec",
self.items_per_second,
self.bytes_per_second / 1000.0
)
}
pub fn throughput_summary_processing_time(&self) -> String {
format!(
"{:.1} items/sec, {:.1} KB/sec",
self.throughput_items_per_sec(),
self.throughput_bytes_per_sec() / 1000.0
)
}
}