1use crate::{
32 adaptive_ratelimit::{AdaptiveRateLimitConfig, AdaptiveRateLimiter},
33 cache::TtlCache,
34 content_router::ContentRouter,
35 network_diag::NetworkMonitor,
36 peer_selection::PeerSelector,
37 qos::{Priority, QosConfig, QosManager, RequestInfo},
38 reputation::{ReputationConfig, ReputationTracker},
39 utils::{RetryConfig, current_timestamp_ms},
40};
41use std::{
42 collections::{HashMap, HashSet},
43 sync::{Arc, Mutex as StdMutex},
44 time::Duration,
45};
46use tokio::sync::Mutex;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum RetrievalStrategy {
51 BestEffort,
53 Strict,
55 Redundant,
57 Fastest,
59}
60
61#[derive(Debug, Clone)]
63pub struct OrchestratorConfig {
64 pub max_concurrent: usize,
66 pub request_timeout_ms: u64,
68 pub retry_config: RetryConfig,
70 pub enable_caching: bool,
72 pub cache_ttl_secs: u64,
74 pub max_peers_per_request: usize,
76 pub min_reputation: f64,
78 pub enable_qos: bool,
80 pub qos_config: QosConfig,
82}
83
84impl Default for OrchestratorConfig {
85 #[inline]
86 fn default() -> Self {
87 Self {
88 max_concurrent: 100,
89 request_timeout_ms: 30_000,
90 retry_config: RetryConfig::default(),
91 enable_caching: true,
92 cache_ttl_secs: 300,
93 max_peers_per_request: 5,
94 min_reputation: 0.3,
95 enable_qos: true,
96 qos_config: QosConfig::default(),
97 }
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct RetrievalResult {
104 pub cid: String,
106 pub total_bytes: u64,
108 pub peers_used: Vec<String>,
110 pub duration_ms: u64,
112 pub complete: bool,
114 pub retries: u32,
116}
117
118#[derive(Debug, Clone, Default)]
120pub struct OrchestratorStats {
121 pub total_requests: u64,
123 pub successful_requests: u64,
125 pub failed_requests: u64,
127 pub cache_hits: u64,
129 pub total_bytes: u64,
131 pub total_retries: u64,
133 pub avg_duration_ms: f64,
135}
136
137impl OrchestratorStats {
138 #[must_use]
140 #[inline]
141 pub fn success_rate(&self) -> f64 {
142 if self.total_requests == 0 {
143 return 0.0;
144 }
145 self.successful_requests as f64 / self.total_requests as f64
146 }
147
148 #[must_use]
150 #[inline]
151 pub fn cache_hit_rate(&self) -> f64 {
152 if self.total_requests == 0 {
153 return 0.0;
154 }
155 self.cache_hits as f64 / self.total_requests as f64
156 }
157}
158
159#[derive(Debug)]
161#[allow(dead_code)]
162struct RequestContext {
163 cid: String,
164 strategy: RetrievalStrategy,
165 start_time: i64,
166 peers_tried: HashSet<String>,
167 bytes_retrieved: u64,
168 retries: u32,
169}
170
171impl RequestContext {
172 #[must_use]
173 #[inline]
174 fn new(cid: String, strategy: RetrievalStrategy) -> Self {
175 Self {
176 cid,
177 strategy,
178 start_time: current_timestamp_ms(),
179 peers_tried: HashSet::new(),
180 bytes_retrieved: 0,
181 retries: 0,
182 }
183 }
184
185 #[must_use]
186 #[inline]
187 fn elapsed_ms(&self) -> u64 {
188 current_timestamp_ms().saturating_sub(self.start_time) as u64
189 }
190}
191
192#[allow(dead_code)]
194pub struct RequestOrchestrator {
195 config: OrchestratorConfig,
196 peer_selector: Arc<StdMutex<PeerSelector>>,
197 content_router: Arc<StdMutex<ContentRouter>>,
198 reputation_tracker: Arc<StdMutex<ReputationTracker>>,
199 network_monitor: Arc<StdMutex<NetworkMonitor>>,
200 rate_limiter: Arc<StdMutex<AdaptiveRateLimiter>>,
201 qos_manager: Arc<Mutex<QosManager>>,
202 failed_peers: Arc<StdMutex<HashMap<String, u32>>>, result_cache: Arc<StdMutex<TtlCache<String, RetrievalResult>>>,
204 stats: Arc<StdMutex<OrchestratorStats>>,
205}
206
207impl RequestOrchestrator {
208 #[must_use]
210 pub fn new(config: OrchestratorConfig) -> Self {
211 let cache_ttl = Duration::from_secs(config.cache_ttl_secs);
212 Self {
213 qos_manager: Arc::new(Mutex::new(QosManager::new(config.qos_config.clone()))),
214 config: config.clone(),
215 peer_selector: Arc::new(StdMutex::new(PeerSelector::new())),
216 content_router: Arc::new(StdMutex::new(ContentRouter::new())),
217 reputation_tracker: Arc::new(StdMutex::new(ReputationTracker::new(
218 ReputationConfig::default(),
219 ))),
220 network_monitor: Arc::new(StdMutex::new(NetworkMonitor::new())),
221 rate_limiter: Arc::new(StdMutex::new(AdaptiveRateLimiter::new(
222 AdaptiveRateLimitConfig::default(),
223 ))),
224 failed_peers: Arc::new(StdMutex::new(HashMap::new())),
225 result_cache: Arc::new(StdMutex::new(TtlCache::new(1000, cache_ttl))),
226 stats: Arc::new(StdMutex::new(OrchestratorStats::default())),
227 }
228 }
229
230 pub async fn retrieve_content(
235 &self,
236 cid: &str,
237 strategy: RetrievalStrategy,
238 priority: Option<Priority>,
239 ) -> Result<RetrievalResult, OrchestratorError> {
240 if self.config.enable_caching {
242 let cid_owned = cid.to_string();
243 if let Some(cached) = self.result_cache.lock().unwrap().get(&cid_owned) {
244 self.stats.lock().unwrap().cache_hits += 1;
245 return Ok(cached.clone());
246 }
247 }
248
249 let request_id = format!("{}:{}", cid, current_timestamp_ms());
251 if self.config.enable_qos && priority.is_some() {
252 let qos_request = RequestInfo {
253 id: request_id.clone(),
254 cid: cid.to_string(),
255 size_bytes: 0, priority: priority.unwrap_or_default(),
257 deadline_ms: None, };
259
260 let enqueued = self.qos_manager.lock().await.enqueue(qos_request).await;
262 if !enqueued {
263 return Err(OrchestratorError::QueueFull);
265 }
266
267 let _ = self.qos_manager.lock().await.dequeue().await;
271 }
272
273 let mut ctx = RequestContext::new(cid.to_string(), strategy);
274
275 let result = match strategy {
276 RetrievalStrategy::BestEffort => self.retrieve_best_effort(&mut ctx).await,
277 RetrievalStrategy::Strict => self.retrieve_strict(&mut ctx).await,
278 RetrievalStrategy::Redundant => self.retrieve_redundant(&mut ctx).await,
279 RetrievalStrategy::Fastest => self.retrieve_fastest(&mut ctx).await,
280 };
281
282 let mut stats = self.stats.lock().unwrap();
284 stats.total_requests += 1;
285
286 match &result {
287 Ok(res) => {
288 stats.successful_requests += 1;
289 stats.total_bytes += res.total_bytes;
290 stats.total_retries += res.retries as u64;
291
292 let total = stats.successful_requests as f64;
294 stats.avg_duration_ms =
295 (stats.avg_duration_ms * (total - 1.0) + res.duration_ms as f64) / total;
296
297 if self.config.enable_caching {
299 let cid_owned = cid.to_string();
300 self.result_cache
301 .lock()
302 .unwrap()
303 .insert(cid_owned, res.clone());
304 }
305 }
306 Err(_) => {
307 stats.failed_requests += 1;
308 }
309 }
310
311 result
312 }
313
314 async fn retrieve_best_effort(
316 &self,
317 ctx: &mut RequestContext,
318 ) -> Result<RetrievalResult, OrchestratorError> {
319 let peers = self.select_peers_for_content(&ctx.cid)?;
320
321 for peer in peers.iter().take(self.config.max_peers_per_request) {
322 if ctx.peers_tried.contains(peer) {
323 continue;
324 }
325
326 if !self.is_peer_available(peer) {
328 continue;
329 }
330
331 if !self.check_rate_limit(peer) {
333 continue;
334 }
335
336 ctx.peers_tried.insert(peer.clone());
337
338 match self.retrieve_from_peer(&ctx.cid, peer, ctx).await {
340 Ok(bytes) => {
341 ctx.bytes_retrieved += bytes;
342
343 self.record_peer_success(peer, bytes, ctx.elapsed_ms());
345
346 return Ok(RetrievalResult {
348 cid: ctx.cid.clone(),
349 total_bytes: ctx.bytes_retrieved,
350 peers_used: vec![peer.clone()],
351 duration_ms: ctx.elapsed_ms(),
352 complete: true,
353 retries: ctx.retries,
354 });
355 }
356 Err(_) => {
357 ctx.retries += 1;
358 self.record_peer_failure(peer);
359 continue;
360 }
361 }
362 }
363
364 Err(OrchestratorError::NoAvailablePeers)
365 }
366
367 async fn retrieve_strict(
369 &self,
370 ctx: &mut RequestContext,
371 ) -> Result<RetrievalResult, OrchestratorError> {
372 let peers = self.select_peers_for_content(&ctx.cid)?;
373
374 for peer in peers.iter().take(self.config.max_peers_per_request) {
375 if !self.is_peer_available(peer) || !self.check_rate_limit(peer) {
376 continue;
377 }
378
379 ctx.peers_tried.insert(peer.clone());
380
381 match self.retrieve_from_peer(&ctx.cid, peer, ctx).await {
382 Ok(bytes) => {
383 self.record_peer_success(peer, bytes, ctx.elapsed_ms());
384
385 return Ok(RetrievalResult {
386 cid: ctx.cid.clone(),
387 total_bytes: bytes,
388 peers_used: vec![peer.clone()],
389 duration_ms: ctx.elapsed_ms(),
390 complete: true,
391 retries: ctx.retries,
392 });
393 }
394 Err(_) => {
395 ctx.retries += 1;
396 self.record_peer_failure(peer);
397 }
398 }
399 }
400
401 Err(OrchestratorError::RetrievalFailed)
402 }
403
404 async fn retrieve_redundant(
406 &self,
407 ctx: &mut RequestContext,
408 ) -> Result<RetrievalResult, OrchestratorError> {
409 let peers = self.select_peers_for_content(&ctx.cid)?;
410 let redundancy_count = 2.min(peers.len());
411
412 let mut successful_peers = Vec::new();
413 let mut total_bytes = 0;
414
415 for peer in peers.iter().take(redundancy_count) {
416 if !self.is_peer_available(peer) || !self.check_rate_limit(peer) {
417 continue;
418 }
419
420 ctx.peers_tried.insert(peer.clone());
421
422 if let Ok(bytes) = self.retrieve_from_peer(&ctx.cid, peer, ctx).await {
423 self.record_peer_success(peer, bytes, ctx.elapsed_ms());
424 successful_peers.push(peer.clone());
425 total_bytes = bytes; } else {
427 self.record_peer_failure(peer);
428 }
429 }
430
431 if successful_peers.len() >= redundancy_count {
432 Ok(RetrievalResult {
433 cid: ctx.cid.clone(),
434 total_bytes,
435 peers_used: successful_peers,
436 duration_ms: ctx.elapsed_ms(),
437 complete: true,
438 retries: ctx.retries,
439 })
440 } else {
441 Err(OrchestratorError::InsufficientRedundancy)
442 }
443 }
444
445 async fn retrieve_fastest(
447 &self,
448 ctx: &mut RequestContext,
449 ) -> Result<RetrievalResult, OrchestratorError> {
450 self.retrieve_best_effort(ctx).await
452 }
453
454 fn select_peers_for_content(&self, cid: &str) -> Result<Vec<String>, OrchestratorError> {
456 let mut router = self.content_router.lock().unwrap();
457 let peers = router.find_peers(cid, 10);
458
459 if peers.is_empty() {
460 return Err(OrchestratorError::ContentNotFound);
461 }
462
463 let mut reputation = self.reputation_tracker.lock().unwrap();
465 let qualified: Vec<String> = peers
466 .into_iter()
467 .filter(|p| reputation.get_reputation(p) >= self.config.min_reputation)
468 .collect();
469
470 if qualified.is_empty() {
471 return Err(OrchestratorError::NoQualifiedPeers);
472 }
473
474 Ok(qualified)
476 }
477
478 #[inline]
480 fn is_peer_available(&self, peer_id: &str) -> bool {
481 let failures = self.failed_peers.lock().unwrap();
482 let count = failures.get(peer_id).copied().unwrap_or(0);
483 count < 5 }
485
486 #[inline]
488 fn check_rate_limit(&self, peer_id: &str) -> bool {
489 let mut reputation = self.reputation_tracker.lock().unwrap();
490 let score = reputation.get_reputation(peer_id);
491
492 let mut limiter = self.rate_limiter.lock().unwrap();
493 limiter.check_rate_limit(peer_id, score)
494 }
495
496 #[inline]
498 fn record_peer_success(&self, peer_id: &str, bytes: u64, latency_ms: u64) {
499 self.reputation_tracker
501 .lock()
502 .unwrap()
503 .record_success(peer_id.to_string(), bytes);
504
505 self.network_monitor
507 .lock()
508 .unwrap()
509 .record_latency(peer_id.to_string(), latency_ms);
510
511 self.failed_peers.lock().unwrap().remove(peer_id);
513 }
514
515 #[inline]
517 fn record_peer_failure(&self, peer_id: &str) {
518 self.reputation_tracker
520 .lock()
521 .unwrap()
522 .record_failure(peer_id.to_string(), 1000);
523
524 let mut failures = self.failed_peers.lock().unwrap();
526 *failures.entry(peer_id.to_string()).or_insert(0) += 1;
527 }
528
529 async fn retrieve_from_peer(
531 &self,
532 _cid: &str,
533 _peer_id: &str,
534 _ctx: &RequestContext,
535 ) -> Result<u64, OrchestratorError> {
536 Ok(1024 * 1024) }
546
547 #[must_use]
549 #[inline]
550 pub fn stats(&self) -> OrchestratorStats {
551 self.stats.lock().unwrap().clone()
552 }
553
554 #[inline]
556 pub fn reset_stats(&self) {
557 *self.stats.lock().unwrap() = OrchestratorStats::default();
558 }
559
560 #[must_use]
564 #[inline]
565 pub async fn qos_metrics(&self, priority: Priority) -> Option<crate::qos::SlaMetrics> {
566 if !self.config.enable_qos {
567 return None;
568 }
569 self.qos_manager.lock().await.get_sla_metrics(priority)
570 }
571
572 #[inline]
574 pub fn clear_cache(&self) {
575 self.result_cache.lock().unwrap().clear();
576 }
577}
578
579#[derive(Debug, Clone, PartialEq, Eq)]
581pub enum OrchestratorError {
582 ContentNotFound,
584 NoAvailablePeers,
586 NoQualifiedPeers,
588 RetrievalFailed,
590 InsufficientRedundancy,
592 Timeout,
594 RateLimitExceeded,
596 QueueFull,
598}
599
600impl std::fmt::Display for OrchestratorError {
601 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
602 match self {
603 Self::ContentNotFound => write!(f, "Content not found"),
604 Self::NoAvailablePeers => write!(f, "No available peers"),
605 Self::NoQualifiedPeers => write!(f, "No qualified peers"),
606 Self::RetrievalFailed => write!(f, "Retrieval failed"),
607 Self::InsufficientRedundancy => write!(f, "Insufficient redundancy"),
608 Self::Timeout => write!(f, "Request timeout"),
609 Self::RateLimitExceeded => write!(f, "Rate limit exceeded"),
610 Self::QueueFull => write!(f, "QoS queue is full"),
611 }
612 }
613}
614
615impl std::error::Error for OrchestratorError {}
616
617#[cfg(test)]
618mod tests {
619 use super::*;
620
621 #[test]
622 fn test_orchestrator_config_default() {
623 let config = OrchestratorConfig::default();
624 assert_eq!(config.max_concurrent, 100);
625 assert_eq!(config.request_timeout_ms, 30_000);
626 assert!(config.enable_caching);
627 assert_eq!(config.cache_ttl_secs, 300);
628 }
629
630 #[test]
631 fn test_orchestrator_creation() {
632 let config = OrchestratorConfig::default();
633 let orchestrator = RequestOrchestrator::new(config);
634 let stats = orchestrator.stats();
635 assert_eq!(stats.total_requests, 0);
636 }
637
638 #[test]
639 fn test_orchestrator_stats() {
640 let mut stats = OrchestratorStats::default();
641 assert_eq!(stats.success_rate(), 0.0);
642 assert_eq!(stats.cache_hit_rate(), 0.0);
643
644 stats.total_requests = 100;
645 stats.successful_requests = 80;
646 stats.cache_hits = 20;
647
648 assert_eq!(stats.success_rate(), 0.8);
649 assert_eq!(stats.cache_hit_rate(), 0.2);
650 }
651
652 #[test]
653 fn test_request_context() {
654 let ctx = RequestContext::new("QmTest".to_string(), RetrievalStrategy::BestEffort);
655 assert_eq!(ctx.cid, "QmTest");
656 assert_eq!(ctx.strategy, RetrievalStrategy::BestEffort);
657 assert_eq!(ctx.peers_tried.len(), 0);
658 assert_eq!(ctx.bytes_retrieved, 0);
659 assert_eq!(ctx.retries, 0);
660 }
661
662 #[test]
663 fn test_retrieval_strategies() {
664 assert_eq!(RetrievalStrategy::BestEffort, RetrievalStrategy::BestEffort);
665 assert_ne!(RetrievalStrategy::Strict, RetrievalStrategy::Redundant);
666 }
667
668 #[tokio::test]
669 async fn test_content_not_found() {
670 let config = OrchestratorConfig::default();
671 let orchestrator = RequestOrchestrator::new(config);
672
673 let result = orchestrator
674 .retrieve_content("QmNonExistent", RetrievalStrategy::BestEffort, None)
675 .await;
676 assert!(result.is_err());
677 assert_eq!(result.unwrap_err(), OrchestratorError::ContentNotFound);
678 }
679
680 #[test]
681 fn test_orchestrator_reset_stats() {
682 let config = OrchestratorConfig::default();
683 let orchestrator = RequestOrchestrator::new(config);
684
685 {
686 let mut stats = orchestrator.stats.lock().unwrap();
687 stats.total_requests = 100;
688 stats.successful_requests = 80;
689 }
690
691 orchestrator.reset_stats();
692 let stats = orchestrator.stats();
693 assert_eq!(stats.total_requests, 0);
694 assert_eq!(stats.successful_requests, 0);
695 }
696
697 #[test]
698 fn test_orchestrator_clear_cache() {
699 let config = OrchestratorConfig::default();
700 let orchestrator = RequestOrchestrator::new(config);
701
702 orchestrator.clear_cache();
703 assert_eq!(orchestrator.result_cache.lock().unwrap().len(), 0);
704 }
705
706 #[test]
707 fn test_orchestrator_error_display() {
708 assert_eq!(
709 OrchestratorError::ContentNotFound.to_string(),
710 "Content not found"
711 );
712 assert_eq!(
713 OrchestratorError::NoAvailablePeers.to_string(),
714 "No available peers"
715 );
716 assert_eq!(OrchestratorError::Timeout.to_string(), "Request timeout");
717 }
718
719 #[test]
720 fn test_retrieval_result_clone() {
721 let result = RetrievalResult {
722 cid: "QmTest".to_string(),
723 total_bytes: 1024,
724 peers_used: vec!["peer1".to_string()],
725 duration_ms: 100,
726 complete: true,
727 retries: 0,
728 };
729
730 let cloned = result.clone();
731 assert_eq!(cloned.cid, result.cid);
732 assert_eq!(cloned.total_bytes, result.total_bytes);
733 }
734}