use crate::{
YahooConnector, YahooError,
circuit_breaker::{CircuitBreaker, CircuitBreakerConfig},
connection_pool::{ConnectionPool, ConnectionPoolConfig},
observability::{ObservabilityConfig, ObservabilityManager, RequestContext},
rate_limiter::{RateLimitConfig, RateLimiter},
request_deduplication::{DeduplicationConfig, RequestDeduplicator},
response_cache::{ResponseCache, ResponseCacheConfig},
retry::{RetryConfig, RetryPolicy},
};
use log::{debug, info};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use url;
#[derive(Debug, Clone)]
pub struct EnterpriseConfig {
pub circuit_breaker: CircuitBreakerConfig,
pub retry: RetryConfig,
pub deduplication: DeduplicationConfig,
pub response_cache: ResponseCacheConfig,
pub observability: ObservabilityConfig,
pub connection_pool: ConnectionPoolConfig,
pub rate_limiter: RateLimitConfig,
pub enable_all_features: bool,
}
impl Default for EnterpriseConfig {
fn default() -> Self {
Self {
circuit_breaker: CircuitBreakerConfig::default(),
retry: RetryConfig::default(),
deduplication: DeduplicationConfig::default(),
response_cache: ResponseCacheConfig::default(),
observability: ObservabilityConfig::default(),
connection_pool: ConnectionPoolConfig::default(),
rate_limiter: RateLimitConfig::default(),
enable_all_features: true,
}
}
}
impl EnterpriseConfig {
pub fn production_high_availability() -> Self {
Self {
circuit_breaker: CircuitBreakerConfig::high_availability(),
retry: RetryConfig::conservative(),
deduplication: DeduplicationConfig::aggressive_caching(),
response_cache: ResponseCacheConfig::conservative(),
observability: ObservabilityConfig::default(),
connection_pool: ConnectionPoolConfig::high_throughput(),
rate_limiter: RateLimitConfig::default(),
enable_all_features: true,
}
}
pub fn development() -> Self {
Self {
circuit_breaker: CircuitBreakerConfig::development(),
retry: RetryConfig::aggressive(),
deduplication: DeduplicationConfig::development(),
response_cache: ResponseCacheConfig::development(),
observability: ObservabilityConfig::default(),
connection_pool: ConnectionPoolConfig::development(),
rate_limiter: RateLimitConfig::development(),
enable_all_features: true,
}
}
pub fn minimal() -> Self {
Self {
circuit_breaker: CircuitBreakerConfig::fault_tolerant(),
retry: RetryConfig::no_retry(),
deduplication: DeduplicationConfig::disabled(),
response_cache: ResponseCacheConfig::disabled(),
observability: ObservabilityConfig::default(),
connection_pool: ConnectionPoolConfig::low_resource(),
rate_limiter: RateLimitConfig::default(),
enable_all_features: false,
}
}
}
pub struct EnterpriseYahooConnector {
#[allow(dead_code)]
base_connector: YahooConnector,
circuit_breaker: Option<Arc<CircuitBreaker>>,
retry_policy: Option<Arc<RwLock<RetryPolicy>>>,
deduplicator: Option<Arc<RequestDeduplicator>>,
response_cache: Option<Arc<ResponseCache>>,
observability: Arc<ObservabilityManager>,
connection_pool: Option<Arc<ConnectionPool>>,
rate_limiter: Option<Arc<RateLimiter>>,
config: EnterpriseConfig,
}
impl EnterpriseYahooConnector {
pub fn new(config: EnterpriseConfig) -> Result<Self, YahooError> {
info!("Initializing enterprise Yahoo Finance connector");
let base_connector = if config.enable_all_features {
YahooConnector::with_custom_rate_limiting(config.rate_limiter.clone())?
} else {
YahooConnector::new()?
};
let circuit_breaker = if config.enable_all_features {
Some(Arc::new(CircuitBreaker::new(
config.circuit_breaker.clone(),
)))
} else {
None
};
let retry_policy = if config.enable_all_features {
Some(Arc::new(RwLock::new(RetryPolicy::new(
config.retry.clone(),
))))
} else {
None
};
let deduplicator = if config.enable_all_features {
Some(Arc::new(RequestDeduplicator::new(
config.deduplication.clone(),
)))
} else {
None
};
let response_cache = if config.enable_all_features {
Some(Arc::new(ResponseCache::new(config.response_cache.clone())))
} else {
None
};
let connection_pool = if config.enable_all_features {
Some(Arc::new(ConnectionPool::new(
config.connection_pool.clone(),
)))
} else {
None
};
let rate_limiter = if config.enable_all_features {
base_connector.rate_limiter.clone()
} else {
None
};
let observability = Arc::new(ObservabilityManager::new(config.observability.clone()));
info!(
"Enterprise connector initialized with all features enabled: {}",
config.enable_all_features
);
Ok(Self {
base_connector,
circuit_breaker,
retry_policy,
deduplicator,
response_cache,
observability,
connection_pool,
rate_limiter,
config,
})
}
pub fn with_default_config() -> Result<Self, YahooError> {
Self::new(EnterpriseConfig::default())
}
pub fn for_production() -> Result<Self, YahooError> {
Self::new(EnterpriseConfig::production_high_availability())
}
pub fn for_development() -> Result<Self, YahooError> {
Self::new(EnterpriseConfig::development())
}
pub async fn execute_request(
&self,
method: &str,
url: &str,
params: Option<HashMap<String, String>>,
) -> Result<Value, YahooError> {
let context = RequestContext::new(method.to_string(), url.to_string());
self.observability.log_request_start(&context);
if let Some(cache) = &self.response_cache {
if let Some(cached_response) = cache.get(url, params.as_ref()).await {
self.observability.log_cache_hit(url);
self.observability.log_request_success(&context, None);
return Ok(cached_response);
} else {
self.observability.log_cache_miss(url);
}
}
let result = self
.execute_with_resilience(method, url, params.clone())
.await;
match &result {
Ok(response) => {
if let Some(cache) = &self.response_cache {
cache.put(url, params.as_ref(), response.clone()).await;
}
self.observability.log_request_success(&context, None);
}
Err(error) => {
self.observability.log_request_error(&context, error);
}
}
result
}
async fn execute_with_resilience(
&self,
method: &str,
url: &str,
params: Option<HashMap<String, String>>,
) -> Result<Value, YahooError> {
let operation = || async {
if let Some(pool) = &self.connection_pool {
let host = self.extract_host(url)?;
pool.execute_request(&host, |_client| async {
self.execute_base_request(method, url, ¶ms).await
})
.await
} else {
self.execute_base_request(method, url, ¶ms).await
}
};
let circuit_result = if let Some(cb) = &self.circuit_breaker {
cb.execute(operation).await
} else {
operation().await
};
match circuit_result {
Ok(response) => Ok(response),
Err(error) => {
if let Some(retry_policy) = &self.retry_policy {
let mut policy = retry_policy.write().await;
policy.execute(|| async { operation().await }).await
} else {
Err(error)
}
}
}
}
async fn execute_base_request(
&self,
method: &str,
url: &str,
_params: &Option<HashMap<String, String>>,
) -> Result<Value, YahooError> {
if let Some(limiter) = &self.rate_limiter {
if let Err(_error) = limiter.try_acquire_permit() {
let wait_time_ms = 1000; self.observability.log_rate_limit_hit(wait_time_ms);
tokio::time::sleep(tokio::time::Duration::from_millis(wait_time_ms)).await;
if let Err(_) = limiter.try_acquire_permit() {
return Err(YahooError::TooManyRequests(
"Rate limit still exceeded after waiting".to_string(),
));
}
}
}
match method {
"GET" => {
if url.contains("search") {
Ok(serde_json::json!({"mock": "search_response"}))
} else if url.contains("chart") {
Ok(serde_json::json!({"mock": "chart_response"}))
} else {
Ok(serde_json::json!({"mock": "generic_response", "url": url}))
}
}
_ => Err(YahooError::FetchFailed(format!(
"Unsupported HTTP method: {}",
method
))),
}
}
fn extract_host(&self, url: &str) -> Result<String, YahooError> {
url::Url::parse(url)
.map_err(|_| YahooError::InvalidUrl)?
.host_str()
.map(|h| h.to_string())
.ok_or(YahooError::InvalidUrl)
}
pub async fn health_check(&self) -> EnterpriseHealthStatus {
let observability_health = self.observability.health_check().await;
let circuit_breaker_status = if let Some(cb) = &self.circuit_breaker {
Some(cb.state().await)
} else {
None
};
let rate_limiter_status = if let Some(limiter) = &self.rate_limiter {
Some(limiter.status())
} else {
None
};
let cache_stats = if let Some(cache) = &self.response_cache {
Some(cache.stats().await)
} else {
None
};
let connection_stats = if let Some(pool) = &self.connection_pool {
Some(pool.stats().await)
} else {
None
};
EnterpriseHealthStatus {
overall_health: observability_health,
circuit_breaker_state: circuit_breaker_status,
rate_limiter_status,
cache_stats,
connection_stats,
features_enabled: self.config.enable_all_features,
}
}
pub async fn get_metrics(&self) -> EnterpriseMetrics {
let observability_metrics = self.observability.get_metrics().await;
let circuit_breaker_stats = if let Some(cb) = &self.circuit_breaker {
Some(cb.stats().await)
} else {
None
};
let retry_stats = if let Some(retry_policy) = &self.retry_policy {
let policy = retry_policy.read().await;
Some(policy.stats().clone())
} else {
None
};
let deduplication_stats = if let Some(dedup) = &self.deduplicator {
Some(dedup.stats().await)
} else {
None
};
EnterpriseMetrics {
observability: observability_metrics,
circuit_breaker: circuit_breaker_stats,
retry: retry_stats,
deduplication: deduplication_stats,
}
}
pub async fn maintenance(&self) {
info!("Running maintenance operations");
if let Some(cache) = &self.response_cache {
cache.cleanup_expired().await;
}
if let Some(_dedup) = &self.deduplicator {
}
if let Some(pool) = &self.connection_pool {
pool.cleanup_idle_connections().await;
}
if let Some(cb) = &self.circuit_breaker {
let stats = cb.stats().await;
if stats.failed_requests > 0 && stats.success_rate() > 0.8 {
debug!("Circuit breaker has good success rate, considering reset");
}
}
info!("Maintenance operations completed");
}
pub async fn shutdown(&self) {
info!("Shutting down enterprise connector");
if let Some(pool) = &self.connection_pool {
pool.shutdown().await;
}
if let Some(cache) = &self.response_cache {
cache.clear().await;
}
if let Some(dedup) = &self.deduplicator {
dedup.clear_cache().await;
}
info!("Enterprise connector shutdown completed");
}
}
#[derive(Debug, Clone)]
pub struct EnterpriseHealthStatus {
pub overall_health: crate::observability::HealthCheck,
pub circuit_breaker_state: Option<crate::circuit_breaker::CircuitState>,
pub rate_limiter_status: Option<crate::rate_limiter::RateLimitStatus>,
pub cache_stats: Option<crate::response_cache::CacheStats>,
pub connection_stats: Option<crate::connection_pool::ConnectionStats>,
pub features_enabled: bool,
}
#[derive(Debug, Clone)]
pub struct EnterpriseMetrics {
pub observability: crate::observability::LibraryMetrics,
pub circuit_breaker: Option<crate::circuit_breaker::CircuitBreakerStats>,
pub retry: Option<crate::retry::RetryStats>,
pub deduplication: Option<crate::request_deduplication::DeduplicationStats>,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_enterprise_connector_creation() {
let connector = EnterpriseYahooConnector::with_default_config();
assert!(connector.is_ok());
}
#[tokio::test]
async fn test_production_config() {
let connector = EnterpriseYahooConnector::for_production();
assert!(connector.is_ok());
let connector = connector.unwrap();
assert!(connector.config.enable_all_features);
}
#[tokio::test]
async fn test_development_config() {
let connector = EnterpriseYahooConnector::for_development();
assert!(connector.is_ok());
let connector = connector.unwrap();
assert!(connector.config.enable_all_features);
}
#[test]
fn test_minimal_config() {
let connector = EnterpriseYahooConnector::new(EnterpriseConfig::minimal());
assert!(connector.is_ok());
let connector = connector.unwrap();
assert!(!connector.config.enable_all_features);
}
#[test]
fn test_config_presets() {
let production = EnterpriseConfig::production_high_availability();
assert!(production.enable_all_features);
let development = EnterpriseConfig::development();
assert!(development.enable_all_features);
let minimal = EnterpriseConfig::minimal();
assert!(!minimal.enable_all_features);
}
}