use crate::yahoo_error::YahooError;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, RwLock};
#[derive(Debug, Clone)]
pub struct AdaptiveRateLimitConfig {
pub base_rps: f64,
pub max_rps: f64,
pub min_rps: f64,
pub window_duration: Duration,
pub burst_capacity: usize,
pub adaptation_factor: f64,
pub backoff_config: BackoffConfig,
pub priority_strategy: PriorityStrategy,
}
#[derive(Debug, Clone)]
pub struct BackoffConfig {
pub initial_delay: Duration,
pub max_delay: Duration,
pub multiplier: f64,
pub jitter_factor: f64,
}
#[derive(Debug, Clone, PartialEq)]
pub enum PriorityStrategy {
Fifo,
Priority,
WeightedFair,
Adaptive,
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
pub failure_threshold: usize,
pub success_threshold: usize,
pub timeout: Duration,
pub window_size: usize,
pub health_check_interval: Duration,
pub recovery_strategy: RecoveryStrategy,
}
#[derive(Debug, Clone, PartialEq)]
pub enum RecoveryStrategy {
Immediate,
Gradual,
Exponential,
}
#[derive(Debug, Clone, PartialEq)]
pub enum CircuitState {
Closed,
Open,
HalfOpen,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RequestPriority {
Low = 0,
Normal = 1,
High = 2,
Critical = 3,
}
#[derive(Debug)]
pub enum RateLimitResult {
Allowed,
Delayed(Duration),
Denied,
}
#[derive(Debug)]
pub enum CircuitResult<T> {
Success(T),
Failure(YahooError),
CircuitOpen,
}
#[derive(Debug, Clone)]
pub struct RequestMetadata {
pub priority: RequestPriority,
pub timestamp: Instant,
pub request_type: String,
pub expected_duration: Option<Duration>,
pub retry_count: usize,
}
pub struct AdaptiveRateLimiter {
config: AdaptiveRateLimitConfig,
current_rps: Arc<parking_lot::RwLock<f64>>,
request_history: Arc<Mutex<VecDeque<Instant>>>,
request_queue: Arc<Mutex<RequestQueue>>,
stats: Arc<RwLock<RateLimitStats>>,
adaptation_state: Arc<RwLock<AdaptationState>>,
}
pub struct IntelligentCircuitBreaker {
config: CircuitBreakerConfig,
state: Arc<RwLock<CircuitState>>,
failure_history: Arc<Mutex<VecDeque<FailureRecord>>>,
success_history: Arc<Mutex<VecDeque<Instant>>>,
stats: Arc<RwLock<CircuitBreakerStats>>,
last_state_change: Arc<RwLock<Instant>>,
}
struct RequestQueue {
high_priority: VecDeque<PendingRequest>,
normal_priority: VecDeque<PendingRequest>,
low_priority: VecDeque<PendingRequest>,
queue_stats: QueueStats,
}
#[derive(Debug)]
struct PendingRequest {
metadata: RequestMetadata,
notify: tokio::sync::oneshot::Sender<RateLimitResult>,
}
#[derive(Debug, Clone)]
pub struct RateLimitStats {
pub total_requests: u64,
pub allowed_requests: u64,
pub delayed_requests: u64,
pub denied_requests: u64,
pub queue_length: usize,
pub avg_wait_time_ms: f64,
pub current_rps: f64,
pub adaptations_count: u64,
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerStats {
pub total_requests: u64,
pub successful_requests: u64,
pub failed_requests: u64,
pub rejected_requests: u64,
pub state_durations: HashMap<CircuitState, Duration>,
pub trip_count: u64,
pub recovery_attempts: u64,
}
#[derive(Debug, Clone)]
struct AdaptationState {
response_times: VecDeque<Duration>,
error_rates: VecDeque<f64>,
last_adaptation: Instant,
adaptation_trend: AdaptationTrend,
}
#[derive(Debug, Clone, PartialEq)]
enum AdaptationTrend {
Increasing,
Stable,
Decreasing,
}
#[derive(Debug, Clone)]
struct FailureRecord {
pub timestamp: Instant,
pub error_type: String,
pub response_time: Option<Duration>,
}
#[derive(Debug, Clone)]
struct QueueStats {
pub high_priority_count: usize,
pub normal_priority_count: usize,
pub low_priority_count: usize,
pub total_wait_time: Duration,
}
impl Default for AdaptiveRateLimitConfig {
fn default() -> Self {
Self {
base_rps: 10.0,
max_rps: 50.0,
min_rps: 1.0,
window_duration: Duration::from_secs(60),
burst_capacity: 20,
adaptation_factor: 0.1,
backoff_config: BackoffConfig::default(),
priority_strategy: PriorityStrategy::Priority,
}
}
}
impl Default for BackoffConfig {
fn default() -> Self {
Self {
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
jitter_factor: 0.1,
}
}
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 5,
success_threshold: 3,
timeout: Duration::from_secs(60),
window_size: 100,
health_check_interval: Duration::from_secs(5),
recovery_strategy: RecoveryStrategy::Gradual,
}
}
}
impl AdaptiveRateLimiter {
pub fn new(config: AdaptiveRateLimitConfig) -> Self {
let current_rps = Arc::new(parking_lot::RwLock::new(config.base_rps));
Self {
config,
current_rps,
request_history: Arc::new(Mutex::new(VecDeque::new())),
request_queue: Arc::new(Mutex::new(RequestQueue::new())),
stats: Arc::new(RwLock::new(RateLimitStats::default())),
adaptation_state: Arc::new(RwLock::new(AdaptationState::new())),
}
}
pub async fn check_rate_limit(
&self,
metadata: RequestMetadata,
) -> Result<RateLimitResult, YahooError> {
self.update_request_stats().await;
let current_rps = *self.current_rps.read();
let can_proceed = self.can_proceed_immediately(current_rps).await?;
if can_proceed {
self.update_request_history().await;
Ok(RateLimitResult::Allowed)
} else {
match self.config.priority_strategy {
PriorityStrategy::Fifo => {
let delay = self.calculate_delay().await;
Ok(RateLimitResult::Delayed(delay))
}
_ => {
self.queue_request(metadata).await
}
}
}
}
pub async fn adapt_rate_limits(
&self,
response_time: Duration,
success: bool,
) -> Result<(), YahooError> {
let mut adaptation_state = self.adaptation_state.write().await;
adaptation_state.response_times.push_back(response_time);
if adaptation_state.response_times.len() > 50 {
adaptation_state.response_times.pop_front();
}
let error_rate = if success { 0.0 } else { 1.0 };
adaptation_state.error_rates.push_back(error_rate);
if adaptation_state.error_rates.len() > 20 {
adaptation_state.error_rates.pop_front();
}
if adaptation_state.last_adaptation.elapsed() > Duration::from_secs(10) {
self.perform_adaptation(&mut adaptation_state).await?;
adaptation_state.last_adaptation = Instant::now();
}
Ok(())
}
pub async fn get_stats(&self) -> RateLimitStats {
let mut stats = self.stats.read().await.clone();
stats.current_rps = *self.current_rps.read();
stats.queue_length = self.get_queue_length().await;
stats
}
async fn can_proceed_immediately(&self, current_rps: f64) -> Result<bool, YahooError> {
let mut history = self.request_history.lock().await;
let now = Instant::now();
while let Some(&front_time) = history.front() {
if now.duration_since(front_time) > self.config.window_duration {
history.pop_front();
} else {
break;
}
}
let requests_in_window = history.len() as f64;
let max_requests = current_rps * self.config.window_duration.as_secs_f64();
Ok(requests_in_window < max_requests)
}
async fn calculate_delay(&self) -> Duration {
let current_rps = *self.current_rps.read();
let base_delay = Duration::from_secs_f64(1.0 / current_rps);
let jitter = self.config.backoff_config.jitter_factor;
let jitter_ms = (base_delay.as_millis() as f64 * jitter) as u64;
base_delay + Duration::from_millis(rand::random::<u64>() % jitter_ms.max(1))
}
async fn queue_request(
&self,
metadata: RequestMetadata,
) -> Result<RateLimitResult, YahooError> {
let (tx, rx) = tokio::sync::oneshot::channel();
let pending = PendingRequest {
metadata: metadata.clone(),
notify: tx,
};
{
let mut queue = self.request_queue.lock().await;
match metadata.priority {
RequestPriority::Critical | RequestPriority::High => {
queue.high_priority.push_back(pending);
queue.queue_stats.high_priority_count += 1;
}
RequestPriority::Normal => {
queue.normal_priority.push_back(pending);
queue.queue_stats.normal_priority_count += 1;
}
RequestPriority::Low => {
queue.low_priority.push_back(pending);
queue.queue_stats.low_priority_count += 1;
}
}
}
match rx.await {
Ok(result) => Ok(result),
Err(_) => Ok(RateLimitResult::Denied),
}
}
async fn perform_adaptation(
&self,
adaptation_state: &mut AdaptationState,
) -> Result<(), YahooError> {
let avg_response_time = if !adaptation_state.response_times.is_empty() {
adaptation_state.response_times.iter().sum::<Duration>()
/ adaptation_state.response_times.len() as u32
} else {
Duration::from_millis(100) };
let avg_error_rate = if !adaptation_state.error_rates.is_empty() {
adaptation_state.error_rates.iter().sum::<f64>()
/ adaptation_state.error_rates.len() as f64
} else {
0.0
};
let current_rps = *self.current_rps.read();
let mut new_rps = current_rps;
if avg_error_rate > 0.1 || avg_response_time > Duration::from_secs(5) {
new_rps =
(current_rps * (1.0 - self.config.adaptation_factor)).max(self.config.min_rps);
adaptation_state.adaptation_trend = AdaptationTrend::Decreasing;
} else if avg_error_rate < 0.01 && avg_response_time < Duration::from_secs(1) {
new_rps =
(current_rps * (1.0 + self.config.adaptation_factor)).min(self.config.max_rps);
adaptation_state.adaptation_trend = AdaptationTrend::Increasing;
} else {
adaptation_state.adaptation_trend = AdaptationTrend::Stable;
}
if (new_rps - current_rps).abs() > 0.1 {
*self.current_rps.write() = new_rps;
let mut stats = self.stats.write().await;
stats.adaptations_count += 1;
}
Ok(())
}
async fn update_request_history(&self) {
let mut history = self.request_history.lock().await;
history.push_back(Instant::now());
if history.len() > 1000 {
history.pop_front();
}
}
async fn update_request_stats(&self) {
let mut stats = self.stats.write().await;
stats.total_requests += 1;
}
async fn get_queue_length(&self) -> usize {
let queue = self.request_queue.lock().await;
queue.high_priority.len() + queue.normal_priority.len() + queue.low_priority.len()
}
}
impl IntelligentCircuitBreaker {
pub fn new(config: CircuitBreakerConfig) -> Self {
Self {
config,
state: Arc::new(RwLock::new(CircuitState::Closed)),
failure_history: Arc::new(Mutex::new(VecDeque::new())),
success_history: Arc::new(Mutex::new(VecDeque::new())),
stats: Arc::new(RwLock::new(CircuitBreakerStats::default())),
last_state_change: Arc::new(RwLock::new(Instant::now())),
}
}
pub async fn call<F, T>(&self, operation: F) -> CircuitResult<T>
where
F: std::future::Future<Output = Result<T, YahooError>>,
{
let state = self.state.read().await.clone();
match state {
CircuitState::Open => {
if self.should_attempt_reset().await {
self.transition_to_half_open().await;
self.execute_operation(operation).await
} else {
self.record_rejected_request().await;
CircuitResult::CircuitOpen
}
}
CircuitState::Closed | CircuitState::HalfOpen => {
self.execute_operation(operation).await
}
}
}
pub async fn get_stats(&self) -> CircuitBreakerStats {
self.stats.read().await.clone()
}
pub async fn get_state(&self) -> CircuitState {
self.state.read().await.clone()
}
async fn execute_operation<F, T>(&self, operation: F) -> CircuitResult<T>
where
F: std::future::Future<Output = Result<T, YahooError>>,
{
let start_time = Instant::now();
match operation.await {
Ok(result) => {
self.record_success(start_time.elapsed()).await;
CircuitResult::Success(result)
}
Err(error) => {
self.record_failure(error.clone(), start_time.elapsed())
.await;
CircuitResult::Failure(error)
}
}
}
async fn record_success(&self, _response_time: Duration) {
let mut success_history = self.success_history.lock().await;
success_history.push_back(Instant::now());
if success_history.len() > self.config.window_size {
success_history.pop_front();
}
let mut stats = self.stats.write().await;
stats.total_requests += 1;
stats.successful_requests += 1;
let state = self.state.read().await.clone();
if state == CircuitState::HalfOpen {
if success_history.len() >= self.config.success_threshold {
drop(success_history);
drop(stats);
self.transition_to_closed().await;
}
}
}
async fn record_failure(&self, error: YahooError, response_time: Duration) {
let failure = FailureRecord {
timestamp: Instant::now(),
error_type: error.to_string(),
response_time: Some(response_time),
};
let mut failure_history = self.failure_history.lock().await;
failure_history.push_back(failure);
if failure_history.len() > self.config.window_size {
failure_history.pop_front();
}
let mut stats = self.stats.write().await;
stats.total_requests += 1;
stats.failed_requests += 1;
let state = self.state.read().await.clone();
if state != CircuitState::Open {
let recent_failures = failure_history
.iter()
.filter(|f| f.timestamp.elapsed() < self.config.window_duration())
.count();
if recent_failures >= self.config.failure_threshold {
drop(failure_history);
drop(stats);
self.transition_to_open().await;
}
}
}
async fn record_rejected_request(&self) {
let mut stats = self.stats.write().await;
stats.rejected_requests += 1;
}
async fn should_attempt_reset(&self) -> bool {
let last_change = *self.last_state_change.read().await;
last_change.elapsed() > self.config.timeout
}
async fn transition_to_open(&self) {
let mut state = self.state.write().await;
*state = CircuitState::Open;
let mut last_change = self.last_state_change.write().await;
*last_change = Instant::now();
let mut stats = self.stats.write().await;
stats.trip_count += 1;
}
async fn transition_to_half_open(&self) {
let mut state = self.state.write().await;
*state = CircuitState::HalfOpen;
let mut last_change = self.last_state_change.write().await;
*last_change = Instant::now();
let mut stats = self.stats.write().await;
stats.recovery_attempts += 1;
let mut success_history = self.success_history.lock().await;
success_history.clear();
}
async fn transition_to_closed(&self) {
let mut state = self.state.write().await;
*state = CircuitState::Closed;
let mut last_change = self.last_state_change.write().await;
*last_change = Instant::now();
}
}
impl RequestQueue {
fn new() -> Self {
Self {
high_priority: VecDeque::new(),
normal_priority: VecDeque::new(),
low_priority: VecDeque::new(),
queue_stats: QueueStats {
high_priority_count: 0,
normal_priority_count: 0,
low_priority_count: 0,
total_wait_time: Duration::from_secs(0),
},
}
}
}
impl AdaptationState {
fn new() -> Self {
Self {
response_times: VecDeque::new(),
error_rates: VecDeque::new(),
last_adaptation: Instant::now(),
adaptation_trend: AdaptationTrend::Stable,
}
}
}
impl CircuitBreakerConfig {
fn window_duration(&self) -> Duration {
Duration::from_secs(self.window_size as u64)
}
}
impl Default for RateLimitStats {
fn default() -> Self {
Self {
total_requests: 0,
allowed_requests: 0,
delayed_requests: 0,
denied_requests: 0,
queue_length: 0,
avg_wait_time_ms: 0.0,
current_rps: 0.0,
adaptations_count: 0,
}
}
}
impl Default for CircuitBreakerStats {
fn default() -> Self {
Self {
total_requests: 0,
successful_requests: 0,
failed_requests: 0,
rejected_requests: 0,
state_durations: HashMap::new(),
trip_count: 0,
recovery_attempts: 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_adaptive_rate_limiter_creation() {
let config = AdaptiveRateLimitConfig::default();
let limiter = AdaptiveRateLimiter::new(config);
let stats = limiter.get_stats().await;
assert_eq!(stats.total_requests, 0);
}
#[tokio::test]
async fn test_rate_limit_check() {
let config = AdaptiveRateLimitConfig {
base_rps: 1.0,
..AdaptiveRateLimitConfig::default()
};
let limiter = AdaptiveRateLimiter::new(config);
let metadata = RequestMetadata {
priority: RequestPriority::Normal,
timestamp: Instant::now(),
request_type: "quote".to_string(),
expected_duration: Some(Duration::from_millis(100)),
retry_count: 0,
};
let result = limiter.check_rate_limit(metadata).await.unwrap();
matches!(result, RateLimitResult::Allowed);
}
#[tokio::test]
async fn test_circuit_breaker_creation() {
let config = CircuitBreakerConfig::default();
let breaker = IntelligentCircuitBreaker::new(config);
assert_eq!(breaker.get_state().await, CircuitState::Closed);
}
#[tokio::test]
async fn test_circuit_breaker_success() {
let config = CircuitBreakerConfig::default();
let breaker = IntelligentCircuitBreaker::new(config);
let result = breaker.call(async { Ok::<i32, YahooError>(42) }).await;
match result {
CircuitResult::Success(value) => assert_eq!(value, 42),
_ => panic!("Expected success"),
}
}
#[tokio::test]
async fn test_circuit_breaker_failure() {
let config = CircuitBreakerConfig::default();
let breaker = IntelligentCircuitBreaker::new(config);
let error = YahooError::ConnectionFailed("Test error".to_string());
let result = breaker.call(async { Err::<i32, YahooError>(error) }).await;
match result {
CircuitResult::Failure(_) => {} _ => panic!("Expected failure"),
}
}
#[tokio::test]
async fn test_rate_limiter_adaptation() {
let config = AdaptiveRateLimitConfig::default();
let limiter = AdaptiveRateLimiter::new(config);
limiter
.adapt_rate_limits(Duration::from_secs(10), false)
.await
.unwrap();
let stats = limiter.get_stats().await;
assert_eq!(stats.adaptations_count, 0); }
}