use crate::{
adaptive_ratelimit::{AdaptiveRateLimitConfig, AdaptiveRateLimiter},
cache::TtlCache,
content_router::ContentRouter,
network_diag::NetworkMonitor,
peer_selection::PeerSelector,
qos::{Priority, QosConfig, QosManager, RequestInfo},
reputation::{ReputationConfig, ReputationTracker},
utils::{RetryConfig, current_timestamp_ms},
};
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex as StdMutex},
time::Duration,
};
use tokio::sync::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RetrievalStrategy {
BestEffort,
Strict,
Redundant,
Fastest,
}
#[derive(Debug, Clone)]
pub struct OrchestratorConfig {
pub max_concurrent: usize,
pub request_timeout_ms: u64,
pub retry_config: RetryConfig,
pub enable_caching: bool,
pub cache_ttl_secs: u64,
pub max_peers_per_request: usize,
pub min_reputation: f64,
pub enable_qos: bool,
pub qos_config: QosConfig,
}
impl Default for OrchestratorConfig {
#[inline]
fn default() -> Self {
Self {
max_concurrent: 100,
request_timeout_ms: 30_000,
retry_config: RetryConfig::default(),
enable_caching: true,
cache_ttl_secs: 300,
max_peers_per_request: 5,
min_reputation: 0.3,
enable_qos: true,
qos_config: QosConfig::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct RetrievalResult {
pub cid: String,
pub total_bytes: u64,
pub peers_used: Vec<String>,
pub duration_ms: u64,
pub complete: bool,
pub retries: u32,
}
#[derive(Debug, Clone, Default)]
pub struct OrchestratorStats {
pub total_requests: u64,
pub successful_requests: u64,
pub failed_requests: u64,
pub cache_hits: u64,
pub total_bytes: u64,
pub total_retries: u64,
pub avg_duration_ms: f64,
}
impl OrchestratorStats {
#[must_use]
#[inline]
pub fn success_rate(&self) -> f64 {
if self.total_requests == 0 {
return 0.0;
}
self.successful_requests as f64 / self.total_requests as f64
}
#[must_use]
#[inline]
pub fn cache_hit_rate(&self) -> f64 {
if self.total_requests == 0 {
return 0.0;
}
self.cache_hits as f64 / self.total_requests as f64
}
}
#[derive(Debug)]
#[allow(dead_code)]
struct RequestContext {
cid: String,
strategy: RetrievalStrategy,
start_time: i64,
peers_tried: HashSet<String>,
bytes_retrieved: u64,
retries: u32,
}
impl RequestContext {
#[must_use]
#[inline]
fn new(cid: String, strategy: RetrievalStrategy) -> Self {
Self {
cid,
strategy,
start_time: current_timestamp_ms(),
peers_tried: HashSet::new(),
bytes_retrieved: 0,
retries: 0,
}
}
#[must_use]
#[inline]
fn elapsed_ms(&self) -> u64 {
current_timestamp_ms().saturating_sub(self.start_time) as u64
}
}
#[allow(dead_code)]
pub struct RequestOrchestrator {
config: OrchestratorConfig,
peer_selector: Arc<StdMutex<PeerSelector>>,
content_router: Arc<StdMutex<ContentRouter>>,
reputation_tracker: Arc<StdMutex<ReputationTracker>>,
network_monitor: Arc<StdMutex<NetworkMonitor>>,
rate_limiter: Arc<StdMutex<AdaptiveRateLimiter>>,
qos_manager: Arc<Mutex<QosManager>>,
failed_peers: Arc<StdMutex<HashMap<String, u32>>>, result_cache: Arc<StdMutex<TtlCache<String, RetrievalResult>>>,
stats: Arc<StdMutex<OrchestratorStats>>,
}
impl RequestOrchestrator {
#[must_use]
pub fn new(config: OrchestratorConfig) -> Self {
let cache_ttl = Duration::from_secs(config.cache_ttl_secs);
Self {
qos_manager: Arc::new(Mutex::new(QosManager::new(config.qos_config.clone()))),
config: config.clone(),
peer_selector: Arc::new(StdMutex::new(PeerSelector::new())),
content_router: Arc::new(StdMutex::new(ContentRouter::new())),
reputation_tracker: Arc::new(StdMutex::new(ReputationTracker::new(
ReputationConfig::default(),
))),
network_monitor: Arc::new(StdMutex::new(NetworkMonitor::new())),
rate_limiter: Arc::new(StdMutex::new(AdaptiveRateLimiter::new(
AdaptiveRateLimitConfig::default(),
))),
failed_peers: Arc::new(StdMutex::new(HashMap::new())),
result_cache: Arc::new(StdMutex::new(TtlCache::new(1000, cache_ttl))),
stats: Arc::new(StdMutex::new(OrchestratorStats::default())),
}
}
pub async fn retrieve_content(
&self,
cid: &str,
strategy: RetrievalStrategy,
priority: Option<Priority>,
) -> Result<RetrievalResult, OrchestratorError> {
if self.config.enable_caching {
let cid_owned = cid.to_string();
if let Some(cached) = self.result_cache.lock().unwrap().get(&cid_owned) {
self.stats.lock().unwrap().cache_hits += 1;
return Ok(cached.clone());
}
}
let request_id = format!("{}:{}", cid, current_timestamp_ms());
if self.config.enable_qos && priority.is_some() {
let qos_request = RequestInfo {
id: request_id.clone(),
cid: cid.to_string(),
size_bytes: 0, priority: priority.unwrap_or_default(),
deadline_ms: None, };
let enqueued = self.qos_manager.lock().await.enqueue(qos_request).await;
if !enqueued {
return Err(OrchestratorError::QueueFull);
}
let _ = self.qos_manager.lock().await.dequeue().await;
}
let mut ctx = RequestContext::new(cid.to_string(), strategy);
let result = match strategy {
RetrievalStrategy::BestEffort => self.retrieve_best_effort(&mut ctx).await,
RetrievalStrategy::Strict => self.retrieve_strict(&mut ctx).await,
RetrievalStrategy::Redundant => self.retrieve_redundant(&mut ctx).await,
RetrievalStrategy::Fastest => self.retrieve_fastest(&mut ctx).await,
};
let mut stats = self.stats.lock().unwrap();
stats.total_requests += 1;
match &result {
Ok(res) => {
stats.successful_requests += 1;
stats.total_bytes += res.total_bytes;
stats.total_retries += res.retries as u64;
let total = stats.successful_requests as f64;
stats.avg_duration_ms =
(stats.avg_duration_ms * (total - 1.0) + res.duration_ms as f64) / total;
if self.config.enable_caching {
let cid_owned = cid.to_string();
self.result_cache
.lock()
.unwrap()
.insert(cid_owned, res.clone());
}
}
Err(_) => {
stats.failed_requests += 1;
}
}
result
}
async fn retrieve_best_effort(
&self,
ctx: &mut RequestContext,
) -> Result<RetrievalResult, OrchestratorError> {
let peers = self.select_peers_for_content(&ctx.cid)?;
for peer in peers.iter().take(self.config.max_peers_per_request) {
if ctx.peers_tried.contains(peer) {
continue;
}
if !self.is_peer_available(peer) {
continue;
}
if !self.check_rate_limit(peer) {
continue;
}
ctx.peers_tried.insert(peer.clone());
match self.retrieve_from_peer(&ctx.cid, peer, ctx).await {
Ok(bytes) => {
ctx.bytes_retrieved += bytes;
self.record_peer_success(peer, bytes, ctx.elapsed_ms());
return Ok(RetrievalResult {
cid: ctx.cid.clone(),
total_bytes: ctx.bytes_retrieved,
peers_used: vec![peer.clone()],
duration_ms: ctx.elapsed_ms(),
complete: true,
retries: ctx.retries,
});
}
Err(_) => {
ctx.retries += 1;
self.record_peer_failure(peer);
continue;
}
}
}
Err(OrchestratorError::NoAvailablePeers)
}
async fn retrieve_strict(
&self,
ctx: &mut RequestContext,
) -> Result<RetrievalResult, OrchestratorError> {
let peers = self.select_peers_for_content(&ctx.cid)?;
for peer in peers.iter().take(self.config.max_peers_per_request) {
if !self.is_peer_available(peer) || !self.check_rate_limit(peer) {
continue;
}
ctx.peers_tried.insert(peer.clone());
match self.retrieve_from_peer(&ctx.cid, peer, ctx).await {
Ok(bytes) => {
self.record_peer_success(peer, bytes, ctx.elapsed_ms());
return Ok(RetrievalResult {
cid: ctx.cid.clone(),
total_bytes: bytes,
peers_used: vec![peer.clone()],
duration_ms: ctx.elapsed_ms(),
complete: true,
retries: ctx.retries,
});
}
Err(_) => {
ctx.retries += 1;
self.record_peer_failure(peer);
}
}
}
Err(OrchestratorError::RetrievalFailed)
}
async fn retrieve_redundant(
&self,
ctx: &mut RequestContext,
) -> Result<RetrievalResult, OrchestratorError> {
let peers = self.select_peers_for_content(&ctx.cid)?;
let redundancy_count = 2.min(peers.len());
let mut successful_peers = Vec::new();
let mut total_bytes = 0;
for peer in peers.iter().take(redundancy_count) {
if !self.is_peer_available(peer) || !self.check_rate_limit(peer) {
continue;
}
ctx.peers_tried.insert(peer.clone());
if let Ok(bytes) = self.retrieve_from_peer(&ctx.cid, peer, ctx).await {
self.record_peer_success(peer, bytes, ctx.elapsed_ms());
successful_peers.push(peer.clone());
total_bytes = bytes; } else {
self.record_peer_failure(peer);
}
}
if successful_peers.len() >= redundancy_count {
Ok(RetrievalResult {
cid: ctx.cid.clone(),
total_bytes,
peers_used: successful_peers,
duration_ms: ctx.elapsed_ms(),
complete: true,
retries: ctx.retries,
})
} else {
Err(OrchestratorError::InsufficientRedundancy)
}
}
async fn retrieve_fastest(
&self,
ctx: &mut RequestContext,
) -> Result<RetrievalResult, OrchestratorError> {
self.retrieve_best_effort(ctx).await
}
fn select_peers_for_content(&self, cid: &str) -> Result<Vec<String>, OrchestratorError> {
let mut router = self.content_router.lock().unwrap();
let peers = router.find_peers(cid, 10);
if peers.is_empty() {
return Err(OrchestratorError::ContentNotFound);
}
let mut reputation = self.reputation_tracker.lock().unwrap();
let qualified: Vec<String> = peers
.into_iter()
.filter(|p| reputation.get_reputation(p) >= self.config.min_reputation)
.collect();
if qualified.is_empty() {
return Err(OrchestratorError::NoQualifiedPeers);
}
Ok(qualified)
}
#[inline]
fn is_peer_available(&self, peer_id: &str) -> bool {
let failures = self.failed_peers.lock().unwrap();
let count = failures.get(peer_id).copied().unwrap_or(0);
count < 5 }
#[inline]
fn check_rate_limit(&self, peer_id: &str) -> bool {
let mut reputation = self.reputation_tracker.lock().unwrap();
let score = reputation.get_reputation(peer_id);
let mut limiter = self.rate_limiter.lock().unwrap();
limiter.check_rate_limit(peer_id, score)
}
#[inline]
fn record_peer_success(&self, peer_id: &str, bytes: u64, latency_ms: u64) {
self.reputation_tracker
.lock()
.unwrap()
.record_success(peer_id.to_string(), bytes);
self.network_monitor
.lock()
.unwrap()
.record_latency(peer_id.to_string(), latency_ms);
self.failed_peers.lock().unwrap().remove(peer_id);
}
#[inline]
fn record_peer_failure(&self, peer_id: &str) {
self.reputation_tracker
.lock()
.unwrap()
.record_failure(peer_id.to_string(), 1000);
let mut failures = self.failed_peers.lock().unwrap();
*failures.entry(peer_id.to_string()).or_insert(0) += 1;
}
async fn retrieve_from_peer(
&self,
_cid: &str,
_peer_id: &str,
_ctx: &RequestContext,
) -> Result<u64, OrchestratorError> {
Ok(1024 * 1024) }
#[must_use]
#[inline]
pub fn stats(&self) -> OrchestratorStats {
self.stats.lock().unwrap().clone()
}
#[inline]
pub fn reset_stats(&self) {
*self.stats.lock().unwrap() = OrchestratorStats::default();
}
#[must_use]
#[inline]
pub async fn qos_metrics(&self, priority: Priority) -> Option<crate::qos::SlaMetrics> {
if !self.config.enable_qos {
return None;
}
self.qos_manager.lock().await.get_sla_metrics(priority)
}
#[inline]
pub fn clear_cache(&self) {
self.result_cache.lock().unwrap().clear();
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OrchestratorError {
ContentNotFound,
NoAvailablePeers,
NoQualifiedPeers,
RetrievalFailed,
InsufficientRedundancy,
Timeout,
RateLimitExceeded,
QueueFull,
}
impl std::fmt::Display for OrchestratorError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ContentNotFound => write!(f, "Content not found"),
Self::NoAvailablePeers => write!(f, "No available peers"),
Self::NoQualifiedPeers => write!(f, "No qualified peers"),
Self::RetrievalFailed => write!(f, "Retrieval failed"),
Self::InsufficientRedundancy => write!(f, "Insufficient redundancy"),
Self::Timeout => write!(f, "Request timeout"),
Self::RateLimitExceeded => write!(f, "Rate limit exceeded"),
Self::QueueFull => write!(f, "QoS queue is full"),
}
}
}
impl std::error::Error for OrchestratorError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_orchestrator_config_default() {
let config = OrchestratorConfig::default();
assert_eq!(config.max_concurrent, 100);
assert_eq!(config.request_timeout_ms, 30_000);
assert!(config.enable_caching);
assert_eq!(config.cache_ttl_secs, 300);
}
#[test]
fn test_orchestrator_creation() {
let config = OrchestratorConfig::default();
let orchestrator = RequestOrchestrator::new(config);
let stats = orchestrator.stats();
assert_eq!(stats.total_requests, 0);
}
#[test]
fn test_orchestrator_stats() {
let mut stats = OrchestratorStats::default();
assert_eq!(stats.success_rate(), 0.0);
assert_eq!(stats.cache_hit_rate(), 0.0);
stats.total_requests = 100;
stats.successful_requests = 80;
stats.cache_hits = 20;
assert_eq!(stats.success_rate(), 0.8);
assert_eq!(stats.cache_hit_rate(), 0.2);
}
#[test]
fn test_request_context() {
let ctx = RequestContext::new("QmTest".to_string(), RetrievalStrategy::BestEffort);
assert_eq!(ctx.cid, "QmTest");
assert_eq!(ctx.strategy, RetrievalStrategy::BestEffort);
assert_eq!(ctx.peers_tried.len(), 0);
assert_eq!(ctx.bytes_retrieved, 0);
assert_eq!(ctx.retries, 0);
}
#[test]
fn test_retrieval_strategies() {
assert_eq!(RetrievalStrategy::BestEffort, RetrievalStrategy::BestEffort);
assert_ne!(RetrievalStrategy::Strict, RetrievalStrategy::Redundant);
}
#[tokio::test]
async fn test_content_not_found() {
let config = OrchestratorConfig::default();
let orchestrator = RequestOrchestrator::new(config);
let result = orchestrator
.retrieve_content("QmNonExistent", RetrievalStrategy::BestEffort, None)
.await;
assert!(result.is_err());
assert_eq!(result.unwrap_err(), OrchestratorError::ContentNotFound);
}
#[test]
fn test_orchestrator_reset_stats() {
let config = OrchestratorConfig::default();
let orchestrator = RequestOrchestrator::new(config);
{
let mut stats = orchestrator.stats.lock().unwrap();
stats.total_requests = 100;
stats.successful_requests = 80;
}
orchestrator.reset_stats();
let stats = orchestrator.stats();
assert_eq!(stats.total_requests, 0);
assert_eq!(stats.successful_requests, 0);
}
#[test]
fn test_orchestrator_clear_cache() {
let config = OrchestratorConfig::default();
let orchestrator = RequestOrchestrator::new(config);
orchestrator.clear_cache();
assert_eq!(orchestrator.result_cache.lock().unwrap().len(), 0);
}
#[test]
fn test_orchestrator_error_display() {
assert_eq!(
OrchestratorError::ContentNotFound.to_string(),
"Content not found"
);
assert_eq!(
OrchestratorError::NoAvailablePeers.to_string(),
"No available peers"
);
assert_eq!(OrchestratorError::Timeout.to_string(), "Request timeout");
}
#[test]
fn test_retrieval_result_clone() {
let result = RetrievalResult {
cid: "QmTest".to_string(),
total_bytes: 1024,
peers_used: vec!["peer1".to_string()],
duration_ms: 100,
complete: true,
retries: 0,
};
let cloned = result.clone();
assert_eq!(cloned.cid, result.cid);
assert_eq!(cloned.total_bytes, result.total_bytes);
}
}