spider_core/
concurrency.rs1use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::time::Duration;
9use tokio::time::Instant;
10use dashmap::DashMap;
11
12#[allow(dead_code)]
14pub(crate) struct AdaptiveSemaphore {
15 permits: AtomicUsize,
17 max_permits: usize,
19 min_permits: usize,
21 metrics: Arc<DashMap<String, PerformanceMetric>>,
23 target_response_time_ms: u64,
25}
26
27#[derive(Debug, Clone)]
29pub struct PerformanceMetric {
30 pub avg_response_time: Duration,
32 pub sample_count: usize,
34 pub error_rate: f64,
36 pub last_update: Instant,
38}
39
40impl AdaptiveSemaphore {
41 pub fn new(initial_permits: usize, max_permits: usize, min_permits: usize) -> Self {
43 Self {
44 permits: AtomicUsize::new(initial_permits),
45 max_permits,
46 min_permits,
47 metrics: Arc::new(DashMap::new()),
48 target_response_time_ms: 1000, }
50 }
51
52 pub async fn update_metrics(&self, endpoint: &str, response_time: Duration, success: bool) {
54 let success_flag = if success { 1.0 } else { 0.0 };
55
56 self.metrics
57 .entry(endpoint.to_string())
58 .and_modify(|metric| {
59 let new_avg = Duration::from_nanos(
61 ((metric.avg_response_time.as_nanos() as f64 * 0.7) +
62 (response_time.as_nanos() as f64 * 0.3)) as u64
63 );
64
65 let total_samples = metric.sample_count as f64 + 1.0;
67 let new_error_rate = (metric.error_rate * metric.sample_count as f64 + (1.0 - success_flag)) / total_samples;
68
69 metric.avg_response_time = new_avg;
70 metric.error_rate = new_error_rate;
71 metric.sample_count += 1;
72 metric.last_update = Instant::now();
73 })
74 .or_insert(PerformanceMetric {
75 avg_response_time: response_time,
76 sample_count: 1,
77 error_rate: 1.0 - success_flag,
78 last_update: Instant::now(),
79 });
80 }
81
82 pub fn current_permits(&self) -> usize {
84 self.permits.load(Ordering::SeqCst)
85 }
86
87 pub fn adjust_permits(&self) {
89 let current_permits = self.permits.load(Ordering::SeqCst);
90
91 let mut total_response_time = Duration::from_millis(0);
93 let mut total_error_rate = 0.0;
94 let mut endpoint_count = 0;
95
96 for metric in self.metrics.iter() {
97 total_response_time += metric.avg_response_time;
98 total_error_rate += metric.error_rate;
99 endpoint_count += 1;
100 }
101
102 if endpoint_count == 0 {
103 return; }
105
106 let avg_response_time = Duration::from_nanos(
107 (total_response_time.as_nanos() / endpoint_count as u128) as u64
108 );
109 let avg_error_rate = total_error_rate / endpoint_count as f64;
110
111 let mut new_permits = current_permits;
112
113 if avg_response_time.as_millis() > self.target_response_time_ms as u128 {
115 new_permits = new_permits.saturating_sub(1).max(self.min_permits);
117 } else if avg_response_time.as_millis() < (self.target_response_time_ms / 2) as u128 {
118 if new_permits < self.max_permits {
120 new_permits += 1;
121 }
122 }
123
124 if avg_error_rate > 0.1 { new_permits = new_permits.saturating_sub(2).max(self.min_permits);
127 } else if avg_error_rate < 0.01 { if new_permits < self.max_permits {
129 new_permits = std::cmp::min(new_permits + 1, self.max_permits);
130 }
131 }
132
133 self.permits.store(new_permits, Ordering::SeqCst);
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141
142 #[test]
143 fn test_adaptive_semaphore() {
144 let semaphore = AdaptiveSemaphore::new(5, 10, 1);
145
146 assert_eq!(semaphore.current_permits(), 5);
147
148 let rt = tokio::runtime::Runtime::new().unwrap();
150 rt.block_on(async {
151 semaphore.update_metrics("test_endpoint", Duration::from_millis(100), true).await;
152 });
153 semaphore.adjust_permits();
154
155 let permits_after = semaphore.current_permits();
157 assert!(permits_after >= 5);
158 }
159}