Skip to main content

spider_core/
concurrency.rs

1//! Advanced concurrency utilities for the spider framework.
2//!
3//! This module provides utilities for adaptive concurrency control,
4//! resource management, and performance optimization across the framework.
5
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::time::Duration;
9use tokio::time::Instant;
10use dashmap::DashMap;
11
12/// Adaptive semaphore that adjusts its permits based on performance metrics
13#[allow(dead_code)]
14pub(crate) struct AdaptiveSemaphore {
15    /// Current number of permits
16    permits: AtomicUsize,
17    /// Maximum number of permits allowed
18    max_permits: usize,
19    /// Minimum number of permits allowed
20    min_permits: usize,
21    /// Performance metrics for decision making
22    metrics: Arc<DashMap<String, PerformanceMetric>>,
23    /// Target response time threshold (in milliseconds)
24    target_response_time_ms: u64,
25}
26
27/// Performance metric for tracking request/response times
28#[derive(Debug, Clone)]
29pub struct PerformanceMetric {
30    /// Average response time
31    pub avg_response_time: Duration,
32    /// Number of samples
33    pub sample_count: usize,
34    /// Error rate
35    pub error_rate: f64,
36    /// Last update time
37    pub last_update: Instant,
38}
39
40impl AdaptiveSemaphore {
41    /// Creates a new adaptive semaphore
42    pub fn new(initial_permits: usize, max_permits: usize, min_permits: usize) -> Self {
43        Self {
44            permits: AtomicUsize::new(initial_permits),
45            max_permits,
46            min_permits,
47            metrics: Arc::new(DashMap::new()),
48            target_response_time_ms: 1000, // 1 second default
49        }
50    }
51
52    /// Updates performance metrics for a specific endpoint
53    pub async fn update_metrics(&self, endpoint: &str, response_time: Duration, success: bool) {
54        let success_flag = if success { 1.0 } else { 0.0 };
55
56        self.metrics
57            .entry(endpoint.to_string())
58            .and_modify(|metric| {
59                // Update average response time with exponential moving average
60                let new_avg = Duration::from_nanos(
61                    ((metric.avg_response_time.as_nanos() as f64 * 0.7) +
62                     (response_time.as_nanos() as f64 * 0.3)) as u64
63                );
64
65                // Update error rate
66                let total_samples = metric.sample_count as f64 + 1.0;
67                let new_error_rate = (metric.error_rate * metric.sample_count as f64 + (1.0 - success_flag)) / total_samples;
68
69                metric.avg_response_time = new_avg;
70                metric.error_rate = new_error_rate;
71                metric.sample_count += 1;
72                metric.last_update = Instant::now();
73            })
74            .or_insert(PerformanceMetric {
75                avg_response_time: response_time,
76                sample_count: 1,
77                error_rate: 1.0 - success_flag,
78                last_update: Instant::now(),
79            });
80    }
81
82    /// Gets the current number of permits
83    pub fn current_permits(&self) -> usize {
84        self.permits.load(Ordering::SeqCst)
85    }
86
87    /// Adjusts permits based on performance metrics
88    pub fn adjust_permits(&self) {
89        let current_permits = self.permits.load(Ordering::SeqCst);
90        
91        // Calculate average metrics across all endpoints
92        let mut total_response_time = Duration::from_millis(0);
93        let mut total_error_rate = 0.0;
94        let mut endpoint_count = 0;
95        
96        for metric in self.metrics.iter() {
97            total_response_time += metric.avg_response_time;
98            total_error_rate += metric.error_rate;
99            endpoint_count += 1;
100        }
101        
102        if endpoint_count == 0 {
103            return; // No metrics to base decision on
104        }
105        
106        let avg_response_time = Duration::from_nanos(
107            (total_response_time.as_nanos() / endpoint_count as u128) as u64
108        );
109        let avg_error_rate = total_error_rate / endpoint_count as f64;
110        
111        let mut new_permits = current_permits;
112        
113        // Adjust based on response time
114        if avg_response_time.as_millis() > self.target_response_time_ms as u128 {
115            // Response time too high, decrease permits
116            new_permits = new_permits.saturating_sub(1).max(self.min_permits);
117        } else if avg_response_time.as_millis() < (self.target_response_time_ms / 2) as u128 {
118            // Response time good, increase permits if not at max
119            if new_permits < self.max_permits {
120                new_permits += 1;
121            }
122        }
123        
124        // Adjust based on error rate
125        if avg_error_rate > 0.1 { // More than 10% errors
126            new_permits = new_permits.saturating_sub(2).max(self.min_permits);
127        } else if avg_error_rate < 0.01 { // Less than 1% errors
128            if new_permits < self.max_permits {
129                new_permits = std::cmp::min(new_permits + 1, self.max_permits);
130            }
131        }
132
133        // Apply the new permit count
134        self.permits.store(new_permits, Ordering::SeqCst);
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141
142    #[test]
143    fn test_adaptive_semaphore() {
144        let semaphore = AdaptiveSemaphore::new(5, 10, 1);
145
146        assert_eq!(semaphore.current_permits(), 5);
147
148        // Update metrics with good performance
149        let rt = tokio::runtime::Runtime::new().unwrap();
150        rt.block_on(async {
151            semaphore.update_metrics("test_endpoint", Duration::from_millis(100), true).await;
152        });
153        semaphore.adjust_permits();
154
155        // Should remain the same or increase slightly
156        let permits_after = semaphore.current_permits();
157        assert!(permits_after >= 5);
158    }
159}