1use std::collections::HashMap;
35use std::sync::Arc;
36use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
37use std::time::{Duration, Instant};
38
39use bytes::Bytes;
40
41#[cfg(feature = "async")]
42use crate::backends::CloudStorageBackend;
43use crate::error::{CloudError, Result};
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
51pub enum CloudProvider {
52 AwsS3,
54 AzureBlob,
56 Gcs,
58 Http,
60 Custom,
62}
63
64impl CloudProvider {
65 #[must_use]
67 pub fn from_url(url: &str) -> Option<Self> {
68 let lower = url.to_lowercase();
69
70 if lower.starts_with("s3://") {
72 return Some(Self::AwsS3);
73 }
74 if lower.starts_with("az://") || lower.starts_with("azure://") {
75 return Some(Self::AzureBlob);
76 }
77 if lower.starts_with("gs://") || lower.starts_with("gcs://") {
78 return Some(Self::Gcs);
79 }
80
81 if lower.starts_with("http://") || lower.starts_with("https://") {
83 if lower.contains(".s3.") || lower.contains(".amazonaws.com") {
84 return Some(Self::AwsS3);
85 }
86 if lower.contains(".blob.core.windows.net") || lower.contains(".azure.") {
87 return Some(Self::AzureBlob);
88 }
89 if lower.contains("storage.googleapis.com")
90 || lower.contains("storage.cloud.google.com")
91 {
92 return Some(Self::Gcs);
93 }
94 return Some(Self::Http);
95 }
96
97 None
98 }
99
100 #[must_use]
102 pub fn display_name(&self) -> &'static str {
103 match self {
104 Self::AwsS3 => "AWS S3",
105 Self::AzureBlob => "Azure Blob Storage",
106 Self::Gcs => "Google Cloud Storage",
107 Self::Http => "HTTP/HTTPS",
108 Self::Custom => "Custom Provider",
109 }
110 }
111
112 #[must_use]
114 pub fn egress_cost_per_gb(&self) -> f64 {
115 match self {
116 Self::AwsS3 => 0.09, Self::AzureBlob => 0.087, Self::Gcs => 0.12, Self::Http => 0.0, Self::Custom => 0.0,
121 }
122 }
123
124 #[must_use]
126 pub fn storage_cost_per_gb(&self) -> f64 {
127 match self {
128 Self::AwsS3 => 0.023, Self::AzureBlob => 0.018, Self::Gcs => 0.020, Self::Http => 0.0,
132 Self::Custom => 0.0,
133 }
134 }
135}
136
137impl std::fmt::Display for CloudProvider {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 write!(f, "{}", self.display_name())
140 }
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, Hash)]
149pub enum CloudRegion {
150 UsEast1,
152 UsEast2,
154 UsWest2,
156 EuWest1,
158 EuCentral1,
160 ApNortheast1,
162 ApSoutheast1,
164 ApSoutheast2,
166 Custom(String),
168}
169
170impl CloudRegion {
171 #[must_use]
173 pub fn from_string(s: &str) -> Self {
174 match s.to_lowercase().as_str() {
175 "us-east-1" | "useast1" | "eastus" => Self::UsEast1,
176 "us-east-2" | "useast2" | "eastus2" => Self::UsEast2,
177 "us-west-2" | "uswest2" | "westus2" => Self::UsWest2,
178 "eu-west-1" | "euwest1" | "westeurope" => Self::EuWest1,
179 "eu-central-1" | "eucentral1" | "germanywestcentral" => Self::EuCentral1,
180 "ap-northeast-1" | "apnortheast1" | "japaneast" => Self::ApNortheast1,
181 "ap-southeast-1" | "apsoutheast1" | "southeastasia" => Self::ApSoutheast1,
182 "ap-southeast-2" | "apsoutheast2" | "australiaeast" => Self::ApSoutheast2,
183 _ => Self::Custom(s.to_string()),
184 }
185 }
186
187 #[must_use]
189 pub fn aws_code(&self) -> &str {
190 match self {
191 Self::UsEast1 => "us-east-1",
192 Self::UsEast2 => "us-east-2",
193 Self::UsWest2 => "us-west-2",
194 Self::EuWest1 => "eu-west-1",
195 Self::EuCentral1 => "eu-central-1",
196 Self::ApNortheast1 => "ap-northeast-1",
197 Self::ApSoutheast1 => "ap-southeast-1",
198 Self::ApSoutheast2 => "ap-southeast-2",
199 Self::Custom(s) => s.as_str(),
200 }
201 }
202
203 #[must_use]
205 pub fn azure_code(&self) -> &str {
206 match self {
207 Self::UsEast1 => "eastus",
208 Self::UsEast2 => "eastus2",
209 Self::UsWest2 => "westus2",
210 Self::EuWest1 => "westeurope",
211 Self::EuCentral1 => "germanywestcentral",
212 Self::ApNortheast1 => "japaneast",
213 Self::ApSoutheast1 => "southeastasia",
214 Self::ApSoutheast2 => "australiaeast",
215 Self::Custom(s) => s.as_str(),
216 }
217 }
218
219 #[must_use]
221 pub fn gcs_code(&self) -> &str {
222 match self {
223 Self::UsEast1 => "us-east1",
224 Self::UsEast2 => "us-east4",
225 Self::UsWest2 => "us-west1",
226 Self::EuWest1 => "europe-west1",
227 Self::EuCentral1 => "europe-west3",
228 Self::ApNortheast1 => "asia-northeast1",
229 Self::ApSoutheast1 => "asia-southeast1",
230 Self::ApSoutheast2 => "australia-southeast1",
231 Self::Custom(s) => s.as_str(),
232 }
233 }
234
235 #[must_use]
237 pub fn estimated_latency_to(&self, other: &Self) -> u32 {
238 if self == other {
239 return 1; }
241
242 match (self, other) {
244 (Self::UsEast1 | Self::UsEast2, Self::UsWest2) => 65,
246 (Self::UsWest2, Self::UsEast1 | Self::UsEast2) => 65,
247 (Self::UsEast1, Self::UsEast2) | (Self::UsEast2, Self::UsEast1) => 10,
248
249 (Self::UsEast1 | Self::UsEast2, Self::EuWest1 | Self::EuCentral1) => 80,
251 (Self::EuWest1 | Self::EuCentral1, Self::UsEast1 | Self::UsEast2) => 80,
252 (Self::UsWest2, Self::EuWest1 | Self::EuCentral1) => 140,
253 (Self::EuWest1 | Self::EuCentral1, Self::UsWest2) => 140,
254
255 (Self::EuWest1, Self::EuCentral1) | (Self::EuCentral1, Self::EuWest1) => 20,
257
258 (Self::UsWest2, Self::ApNortheast1 | Self::ApSoutheast1 | Self::ApSoutheast2) => 100,
260 (Self::ApNortheast1 | Self::ApSoutheast1 | Self::ApSoutheast2, Self::UsWest2) => 100,
261 (Self::UsEast1 | Self::UsEast2, Self::ApNortheast1 | Self::ApSoutheast1) => 180,
262 (Self::ApNortheast1 | Self::ApSoutheast1, Self::UsEast1 | Self::UsEast2) => 180,
263
264 (Self::EuWest1 | Self::EuCentral1, Self::ApNortheast1) => 220,
266 (Self::ApNortheast1, Self::EuWest1 | Self::EuCentral1) => 220,
267 (Self::EuWest1 | Self::EuCentral1, Self::ApSoutheast1 | Self::ApSoutheast2) => 180,
268 (Self::ApSoutheast1 | Self::ApSoutheast2, Self::EuWest1 | Self::EuCentral1) => 180,
269
270 (Self::ApNortheast1, Self::ApSoutheast1 | Self::ApSoutheast2) => 80,
272 (Self::ApSoutheast1 | Self::ApSoutheast2, Self::ApNortheast1) => 80,
273 (Self::ApSoutheast1, Self::ApSoutheast2) | (Self::ApSoutheast2, Self::ApSoutheast1) => {
274 60
275 }
276
277 _ => 150,
279 }
280 }
281}
282
283#[derive(Debug, Clone)]
289pub struct CloudProviderConfig {
290 pub provider: CloudProvider,
292 pub bucket: String,
294 pub prefix: String,
296 pub region: Option<CloudRegion>,
298 pub endpoint: Option<String>,
300 pub priority: u32,
302 pub weight: u32,
304 pub max_concurrent: usize,
306 pub timeout: Duration,
308 pub read_only: bool,
310 pub custom_egress_cost: Option<f64>,
312 pub options: HashMap<String, String>,
314}
315
316impl CloudProviderConfig {
317 #[must_use]
319 pub fn s3(bucket: impl Into<String>) -> Self {
320 Self {
321 provider: CloudProvider::AwsS3,
322 bucket: bucket.into(),
323 prefix: String::new(),
324 region: None,
325 endpoint: None,
326 priority: 1,
327 weight: 100,
328 max_concurrent: 100,
329 timeout: Duration::from_secs(300),
330 read_only: false,
331 custom_egress_cost: None,
332 options: HashMap::new(),
333 }
334 }
335
336 #[must_use]
338 pub fn azure(container: impl Into<String>) -> Self {
339 Self {
340 provider: CloudProvider::AzureBlob,
341 bucket: container.into(),
342 prefix: String::new(),
343 region: None,
344 endpoint: None,
345 priority: 1,
346 weight: 100,
347 max_concurrent: 100,
348 timeout: Duration::from_secs(300),
349 read_only: false,
350 custom_egress_cost: None,
351 options: HashMap::new(),
352 }
353 }
354
355 #[must_use]
357 pub fn gcs(bucket: impl Into<String>) -> Self {
358 Self {
359 provider: CloudProvider::Gcs,
360 bucket: bucket.into(),
361 prefix: String::new(),
362 region: None,
363 endpoint: None,
364 priority: 1,
365 weight: 100,
366 max_concurrent: 100,
367 timeout: Duration::from_secs(300),
368 read_only: false,
369 custom_egress_cost: None,
370 options: HashMap::new(),
371 }
372 }
373
374 #[must_use]
376 pub fn http(base_url: impl Into<String>) -> Self {
377 Self {
378 provider: CloudProvider::Http,
379 bucket: base_url.into(),
380 prefix: String::new(),
381 region: None,
382 endpoint: None,
383 priority: 1,
384 weight: 100,
385 max_concurrent: 100,
386 timeout: Duration::from_secs(300),
387 read_only: true, custom_egress_cost: None,
389 options: HashMap::new(),
390 }
391 }
392
393 #[must_use]
395 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
396 self.prefix = prefix.into();
397 self
398 }
399
400 #[must_use]
402 pub fn with_region(mut self, region: CloudRegion) -> Self {
403 self.region = Some(region);
404 self
405 }
406
407 #[must_use]
409 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
410 self.endpoint = Some(endpoint.into());
411 self
412 }
413
414 #[must_use]
416 pub fn with_priority(mut self, priority: u32) -> Self {
417 self.priority = priority;
418 self
419 }
420
421 #[must_use]
423 pub fn with_weight(mut self, weight: u32) -> Self {
424 self.weight = weight.min(100);
425 self
426 }
427
428 #[must_use]
430 pub fn with_read_only(mut self, read_only: bool) -> Self {
431 self.read_only = read_only;
432 self
433 }
434
435 #[must_use]
437 pub fn with_timeout(mut self, timeout: Duration) -> Self {
438 self.timeout = timeout;
439 self
440 }
441
442 #[must_use]
444 pub fn with_egress_cost(mut self, cost: f64) -> Self {
445 self.custom_egress_cost = Some(cost);
446 self
447 }
448
449 #[must_use]
451 pub fn with_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
452 self.options.insert(key.into(), value.into());
453 self
454 }
455
456 #[must_use]
458 pub fn effective_egress_cost(&self) -> f64 {
459 self.custom_egress_cost
460 .unwrap_or_else(|| self.provider.egress_cost_per_gb())
461 }
462
463 #[must_use]
465 pub fn id(&self) -> String {
466 format!("{}:{}/{}", self.provider, self.bucket, self.prefix)
467 }
468}
469
470#[derive(Debug, Clone)]
476pub struct ProviderHealth {
477 pub provider_id: String,
479 pub healthy: bool,
481 pub last_success: Option<Instant>,
483 pub last_failure: Option<Instant>,
485 pub consecutive_failures: usize,
487 pub avg_latency_ms: f64,
489 pub success_rate: f64,
491 pub total_requests: u64,
493 pub total_bytes: u64,
495}
496
497impl ProviderHealth {
498 fn new(provider_id: String) -> Self {
500 Self {
501 provider_id,
502 healthy: true,
503 last_success: None,
504 last_failure: None,
505 consecutive_failures: 0,
506 avg_latency_ms: 0.0,
507 success_rate: 1.0,
508 total_requests: 0,
509 total_bytes: 0,
510 }
511 }
512
513 fn record_success(&mut self, latency_ms: f64, bytes: u64) {
515 self.last_success = Some(Instant::now());
516 self.consecutive_failures = 0;
517 self.healthy = true;
518 self.total_requests += 1;
519 self.total_bytes += bytes;
520
521 if self.total_requests == 1 {
523 self.avg_latency_ms = latency_ms;
524 } else {
525 self.avg_latency_ms = self.avg_latency_ms * 0.9 + latency_ms * 0.1;
526 }
527
528 self.update_success_rate(true);
530 }
531
532 fn record_failure(&mut self) {
534 self.last_failure = Some(Instant::now());
535 self.consecutive_failures += 1;
536 self.total_requests += 1;
537
538 if self.consecutive_failures >= 3 {
540 self.healthy = false;
541 }
542
543 self.update_success_rate(false);
544 }
545
546 fn update_success_rate(&mut self, success: bool) {
547 let success_value = if success { 1.0 } else { 0.0 };
548 self.success_rate = self.success_rate * 0.95 + success_value * 0.05;
549 }
550}
551
552#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
558pub enum RoutingStrategy {
559 #[default]
561 Priority,
562 RoundRobin,
564 Weighted,
566 LatencyBased,
568 CostOptimized,
570 RegionAware,
572 Adaptive,
574}
575
576struct ProviderStats {
582 request_count: AtomicU64,
583 byte_count: AtomicU64,
584 error_count: AtomicU64,
585 latency_sum_ms: AtomicU64,
586}
587
588impl ProviderStats {
589 fn new() -> Self {
590 Self {
591 request_count: AtomicU64::new(0),
592 byte_count: AtomicU64::new(0),
593 error_count: AtomicU64::new(0),
594 latency_sum_ms: AtomicU64::new(0),
595 }
596 }
597
598 fn record_success(&self, bytes: u64, latency_ms: u64) {
599 self.request_count.fetch_add(1, Ordering::Relaxed);
600 self.byte_count.fetch_add(bytes, Ordering::Relaxed);
601 self.latency_sum_ms.fetch_add(latency_ms, Ordering::Relaxed);
602 }
603
604 fn record_error(&self) {
605 self.request_count.fetch_add(1, Ordering::Relaxed);
606 self.error_count.fetch_add(1, Ordering::Relaxed);
607 }
608
609 fn avg_latency_ms(&self) -> f64 {
610 let requests = self.request_count.load(Ordering::Relaxed);
611 let errors = self.error_count.load(Ordering::Relaxed);
612 let successful = requests.saturating_sub(errors);
613 if successful == 0 {
614 return f64::MAX;
615 }
616 self.latency_sum_ms.load(Ordering::Relaxed) as f64 / successful as f64
617 }
618
619 fn success_rate(&self) -> f64 {
620 let requests = self.request_count.load(Ordering::Relaxed);
621 if requests == 0 {
622 return 1.0;
623 }
624 let errors = self.error_count.load(Ordering::Relaxed);
625 (requests - errors) as f64 / requests as f64
626 }
627}
628
629pub struct MultiCloudManager {
631 providers: Vec<CloudProviderConfig>,
633 routing_strategy: RoutingStrategy,
635 failover_enabled: bool,
637 max_failover_attempts: usize,
639 replication_enabled: bool,
641 client_region: Option<CloudRegion>,
643 stats: HashMap<String, Arc<ProviderStats>>,
645 round_robin_counter: AtomicUsize,
647 health_check_interval: Duration,
649}
650
651impl MultiCloudManager {
652 #[must_use]
654 pub fn builder() -> MultiCloudManagerBuilder {
655 MultiCloudManagerBuilder::new()
656 }
657
658 #[must_use]
660 pub fn providers(&self) -> &[CloudProviderConfig] {
661 &self.providers
662 }
663
664 #[must_use]
666 pub fn get_stats(&self, provider_id: &str) -> Option<(u64, u64, f64, f64)> {
667 self.stats.get(provider_id).map(|s| {
668 (
669 s.request_count.load(Ordering::Relaxed),
670 s.byte_count.load(Ordering::Relaxed),
671 s.avg_latency_ms(),
672 s.success_rate(),
673 )
674 })
675 }
676
677 fn select_provider(&self, operation: &str) -> Result<&CloudProviderConfig> {
679 if self.providers.is_empty() {
680 return Err(CloudError::InvalidConfiguration {
681 message: "No providers configured".to_string(),
682 });
683 }
684
685 let healthy_providers: Vec<_> = self
687 .providers
688 .iter()
689 .filter(|p| {
690 let stats = self.stats.get(&p.id());
691 stats.is_none_or(|s| s.success_rate() > 0.5)
692 })
693 .collect();
694
695 let candidates = if healthy_providers.is_empty() {
696 self.providers.iter().collect::<Vec<_>>()
698 } else {
699 healthy_providers
700 };
701
702 let write_operations = ["put", "delete", "write"];
704 let candidates: Vec<_> = if write_operations.contains(&operation.to_lowercase().as_str()) {
705 candidates.into_iter().filter(|p| !p.read_only).collect()
706 } else {
707 candidates
708 };
709
710 if candidates.is_empty() {
711 return Err(CloudError::NotSupported {
712 operation: format!("No writable providers available for {operation}"),
713 });
714 }
715
716 match self.routing_strategy {
717 RoutingStrategy::Priority => {
718 candidates
720 .iter()
721 .min_by_key(|p| p.priority)
722 .copied()
723 .ok_or_else(|| CloudError::Internal {
724 message: "Failed to select provider".to_string(),
725 })
726 }
727 RoutingStrategy::RoundRobin => {
728 let idx = self.round_robin_counter.fetch_add(1, Ordering::Relaxed);
730 Ok(candidates[idx % candidates.len()])
731 }
732 RoutingStrategy::Weighted => {
733 let total_weight: u32 = candidates.iter().map(|p| p.weight).sum();
735 if total_weight == 0 {
736 return Ok(candidates[0]);
737 }
738
739 let target = simple_random() % total_weight;
740 let mut cumulative = 0u32;
741
742 for provider in &candidates {
743 cumulative += provider.weight;
744 if cumulative > target {
745 return Ok(provider);
746 }
747 }
748 Ok(candidates[0])
749 }
750 RoutingStrategy::LatencyBased => {
751 candidates
753 .iter()
754 .min_by(|a, b| {
755 let lat_a = self
756 .stats
757 .get(&a.id())
758 .map_or(f64::MAX, |s| s.avg_latency_ms());
759 let lat_b = self
760 .stats
761 .get(&b.id())
762 .map_or(f64::MAX, |s| s.avg_latency_ms());
763 lat_a
764 .partial_cmp(&lat_b)
765 .unwrap_or(std::cmp::Ordering::Equal)
766 })
767 .copied()
768 .ok_or_else(|| CloudError::Internal {
769 message: "Failed to select provider".to_string(),
770 })
771 }
772 RoutingStrategy::CostOptimized => {
773 candidates
775 .iter()
776 .min_by(|a, b| {
777 let cost_a = a.effective_egress_cost();
778 let cost_b = b.effective_egress_cost();
779 cost_a
780 .partial_cmp(&cost_b)
781 .unwrap_or(std::cmp::Ordering::Equal)
782 })
783 .copied()
784 .ok_or_else(|| CloudError::Internal {
785 message: "Failed to select provider".to_string(),
786 })
787 }
788 RoutingStrategy::RegionAware => {
789 if let Some(ref client_region) = self.client_region {
791 candidates
792 .iter()
793 .min_by_key(|p| {
794 p.region
795 .as_ref()
796 .map(|r| client_region.estimated_latency_to(r))
797 .unwrap_or(500)
798 })
799 .copied()
800 .ok_or_else(|| CloudError::Internal {
801 message: "Failed to select provider".to_string(),
802 })
803 } else {
804 candidates
806 .iter()
807 .min_by_key(|p| p.priority)
808 .copied()
809 .ok_or_else(|| CloudError::Internal {
810 message: "Failed to select provider".to_string(),
811 })
812 }
813 }
814 RoutingStrategy::Adaptive => {
815 candidates
817 .iter()
818 .min_by(|a, b| {
819 let score_a = self.calculate_adaptive_score(a);
820 let score_b = self.calculate_adaptive_score(b);
821 score_a
822 .partial_cmp(&score_b)
823 .unwrap_or(std::cmp::Ordering::Equal)
824 })
825 .copied()
826 .ok_or_else(|| CloudError::Internal {
827 message: "Failed to select provider".to_string(),
828 })
829 }
830 }
831 }
832
833 fn calculate_adaptive_score(&self, provider: &CloudProviderConfig) -> f64 {
835 let stats = self.stats.get(&provider.id());
836
837 let latency_score = stats.map_or(0.5, |s| {
839 let lat = s.avg_latency_ms();
840 if lat == f64::MAX {
841 1.0
842 } else {
843 (lat / 1000.0).min(1.0) }
845 });
846
847 let cost_score = provider.effective_egress_cost() / 0.2; let health_score = stats.map_or(0.0, |s| 1.0 - s.success_rate());
852
853 let priority_score = provider.priority as f64 / 10.0;
855
856 latency_score * 0.3 + cost_score * 0.3 + health_score * 0.3 + priority_score * 0.1
858 }
859
860 fn get_failover_providers(
862 &self,
863 failed_id: &str,
864 operation: &str,
865 ) -> Vec<&CloudProviderConfig> {
866 let write_operations = ["put", "delete", "write"];
867 let is_write = write_operations.contains(&operation.to_lowercase().as_str());
868
869 let mut candidates: Vec<_> = self
870 .providers
871 .iter()
872 .filter(|p| p.id() != failed_id && (!is_write || !p.read_only))
873 .collect();
874
875 candidates.sort_by_key(|p| p.priority);
877 candidates
878 }
879
880 #[cfg(feature = "async")]
882 pub async fn get(&self, key: &str) -> Result<Bytes> {
883 let provider = self.select_provider("get")?;
884 let start = Instant::now();
885
886 match self.get_from_provider(provider, key).await {
887 Ok(data) => {
888 if let Some(stats) = self.stats.get(&provider.id()) {
889 stats.record_success(data.len() as u64, start.elapsed().as_millis() as u64);
890 }
891 Ok(data)
892 }
893 Err(e) if self.failover_enabled => {
894 if let Some(stats) = self.stats.get(&provider.id()) {
895 stats.record_error();
896 }
897 tracing::warn!(
898 "Provider {} failed for get '{}': {}, attempting failover",
899 provider.id(),
900 key,
901 e
902 );
903 self.get_with_failover(key, &provider.id()).await
904 }
905 Err(e) => {
906 if let Some(stats) = self.stats.get(&provider.id()) {
907 stats.record_error();
908 }
909 Err(e)
910 }
911 }
912 }
913
914 #[cfg(feature = "async")]
915 async fn get_from_provider(
916 &self,
917 _provider: &CloudProviderConfig,
918 _key: &str,
919 ) -> Result<Bytes> {
920 Err(CloudError::NotSupported {
923 operation: "Backend creation not implemented in this context".to_string(),
924 })
925 }
926
927 #[cfg(feature = "async")]
928 async fn get_with_failover(&self, key: &str, failed_id: &str) -> Result<Bytes> {
929 let failover_providers = self.get_failover_providers(failed_id, "get");
930 let mut attempts = 0;
931
932 for provider in failover_providers {
933 if attempts >= self.max_failover_attempts {
934 break;
935 }
936 attempts += 1;
937
938 let start = Instant::now();
939 match self.get_from_provider(provider, key).await {
940 Ok(data) => {
941 if let Some(stats) = self.stats.get(&provider.id()) {
942 stats.record_success(data.len() as u64, start.elapsed().as_millis() as u64);
943 }
944 tracing::info!(
945 "Failover successful to provider {} for key '{}'",
946 provider.id(),
947 key
948 );
949 return Ok(data);
950 }
951 Err(e) => {
952 if let Some(stats) = self.stats.get(&provider.id()) {
953 stats.record_error();
954 }
955 tracing::warn!(
956 "Failover attempt {} to {} failed: {}",
957 attempts,
958 provider.id(),
959 e
960 );
961 }
962 }
963 }
964
965 Err(CloudError::Internal {
966 message: format!("All failover attempts exhausted for key '{key}'"),
967 })
968 }
969
970 #[cfg(feature = "async")]
972 pub async fn put(&self, key: &str, data: &[u8]) -> Result<()> {
973 let provider = self.select_provider("put")?;
974 let start = Instant::now();
975
976 match self.put_to_provider(provider, key, data).await {
977 Ok(()) => {
978 if let Some(stats) = self.stats.get(&provider.id()) {
979 stats.record_success(data.len() as u64, start.elapsed().as_millis() as u64);
980 }
981
982 if self.replication_enabled {
984 self.replicate_to_other_providers(key, data, &provider.id())
985 .await;
986 }
987
988 Ok(())
989 }
990 Err(e) if self.failover_enabled => {
991 if let Some(stats) = self.stats.get(&provider.id()) {
992 stats.record_error();
993 }
994 tracing::warn!(
995 "Provider {} failed for put '{}': {}, attempting failover",
996 provider.id(),
997 key,
998 e
999 );
1000 self.put_with_failover(key, data, &provider.id()).await
1001 }
1002 Err(e) => {
1003 if let Some(stats) = self.stats.get(&provider.id()) {
1004 stats.record_error();
1005 }
1006 Err(e)
1007 }
1008 }
1009 }
1010
1011 #[cfg(feature = "async")]
1012 async fn put_to_provider(
1013 &self,
1014 _provider: &CloudProviderConfig,
1015 _key: &str,
1016 _data: &[u8],
1017 ) -> Result<()> {
1018 Err(CloudError::NotSupported {
1019 operation: "Backend creation not implemented in this context".to_string(),
1020 })
1021 }
1022
1023 #[cfg(feature = "async")]
1024 async fn put_with_failover(&self, key: &str, data: &[u8], failed_id: &str) -> Result<()> {
1025 let failover_providers = self.get_failover_providers(failed_id, "put");
1026 let mut attempts = 0;
1027
1028 for provider in failover_providers {
1029 if attempts >= self.max_failover_attempts {
1030 break;
1031 }
1032 attempts += 1;
1033
1034 let start = Instant::now();
1035 match self.put_to_provider(provider, key, data).await {
1036 Ok(()) => {
1037 if let Some(stats) = self.stats.get(&provider.id()) {
1038 stats.record_success(data.len() as u64, start.elapsed().as_millis() as u64);
1039 }
1040 tracing::info!(
1041 "Failover successful to provider {} for put '{}'",
1042 provider.id(),
1043 key
1044 );
1045 return Ok(());
1046 }
1047 Err(e) => {
1048 if let Some(stats) = self.stats.get(&provider.id()) {
1049 stats.record_error();
1050 }
1051 tracing::warn!(
1052 "Failover attempt {} to {} failed: {}",
1053 attempts,
1054 provider.id(),
1055 e
1056 );
1057 }
1058 }
1059 }
1060
1061 Err(CloudError::Internal {
1062 message: format!("All failover attempts exhausted for put '{key}'"),
1063 })
1064 }
1065
1066 #[cfg(feature = "async")]
1067 async fn replicate_to_other_providers(&self, key: &str, data: &[u8], primary_id: &str) {
1068 let replication_targets: Vec<_> = self
1069 .providers
1070 .iter()
1071 .filter(|p| p.id() != primary_id && !p.read_only)
1072 .collect();
1073
1074 for provider in replication_targets {
1075 if let Err(e) = self.put_to_provider(provider, key, data).await {
1076 tracing::warn!(
1077 "Replication to {} failed for key '{}': {}",
1078 provider.id(),
1079 key,
1080 e
1081 );
1082 }
1083 }
1084 }
1085
1086 #[cfg(feature = "async")]
1088 pub async fn exists(&self, key: &str) -> Result<bool> {
1089 let provider = self.select_provider("exists")?;
1090
1091 match self.exists_in_provider(provider, key).await {
1092 Ok(exists) => Ok(exists),
1093 Err(e) if self.failover_enabled => {
1094 tracing::warn!(
1095 "Provider {} failed for exists '{}': {}, checking other providers",
1096 provider.id(),
1097 key,
1098 e
1099 );
1100
1101 for fallback in self.get_failover_providers(&provider.id(), "exists") {
1102 if let Ok(exists) = self.exists_in_provider(fallback, key).await {
1103 return Ok(exists);
1104 }
1105 }
1106 Err(e)
1107 }
1108 Err(e) => Err(e),
1109 }
1110 }
1111
1112 #[cfg(feature = "async")]
1113 async fn exists_in_provider(
1114 &self,
1115 _provider: &CloudProviderConfig,
1116 _key: &str,
1117 ) -> Result<bool> {
1118 Err(CloudError::NotSupported {
1119 operation: "Backend creation not implemented in this context".to_string(),
1120 })
1121 }
1122
1123 #[cfg(feature = "async")]
1125 pub async fn delete(&self, key: &str) -> Result<()> {
1126 let mut success = false;
1127 let mut last_error = None;
1128
1129 for provider in &self.providers {
1130 if provider.read_only {
1131 continue;
1132 }
1133
1134 match self.delete_from_provider(provider, key).await {
1135 Ok(()) => success = true,
1136 Err(e) => {
1137 tracing::warn!(
1138 "Failed to delete '{}' from provider {}: {}",
1139 key,
1140 provider.id(),
1141 e
1142 );
1143 last_error = Some(e);
1144 }
1145 }
1146 }
1147
1148 if success {
1149 Ok(())
1150 } else {
1151 Err(last_error.unwrap_or_else(|| CloudError::NotSupported {
1152 operation: "No writable providers available".to_string(),
1153 }))
1154 }
1155 }
1156
1157 #[cfg(feature = "async")]
1158 async fn delete_from_provider(
1159 &self,
1160 _provider: &CloudProviderConfig,
1161 _key: &str,
1162 ) -> Result<()> {
1163 Err(CloudError::NotSupported {
1164 operation: "Backend creation not implemented in this context".to_string(),
1165 })
1166 }
1167
1168 #[must_use]
1170 pub fn estimate_transfer_cost(&self, bytes: u64) -> TransferCostEstimate {
1171 let gb = bytes as f64 / (1024.0 * 1024.0 * 1024.0);
1172
1173 let mut estimates = Vec::new();
1174 for provider in &self.providers {
1175 let egress_cost = provider.effective_egress_cost() * gb;
1176 estimates.push((provider.id(), egress_cost));
1177 }
1178
1179 estimates.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1180
1181 let (cheapest_id, cheapest_cost) = estimates.first().cloned().unwrap_or_default();
1182 let (most_expensive_id, most_expensive_cost) =
1183 estimates.last().cloned().unwrap_or_default();
1184
1185 TransferCostEstimate {
1186 bytes,
1187 cheapest_provider: cheapest_id,
1188 cheapest_cost,
1189 most_expensive_provider: most_expensive_id,
1190 most_expensive_cost,
1191 all_estimates: estimates,
1192 }
1193 }
1194}
1195
1196pub struct MultiCloudManagerBuilder {
1202 providers: Vec<CloudProviderConfig>,
1203 routing_strategy: RoutingStrategy,
1204 failover_enabled: bool,
1205 max_failover_attempts: usize,
1206 replication_enabled: bool,
1207 client_region: Option<CloudRegion>,
1208 health_check_interval: Duration,
1209}
1210
1211impl MultiCloudManagerBuilder {
1212 #[must_use]
1214 pub fn new() -> Self {
1215 Self {
1216 providers: Vec::new(),
1217 routing_strategy: RoutingStrategy::Priority,
1218 failover_enabled: true,
1219 max_failover_attempts: 3,
1220 replication_enabled: false,
1221 client_region: None,
1222 health_check_interval: Duration::from_secs(60),
1223 }
1224 }
1225
1226 #[must_use]
1228 pub fn add_provider(mut self, config: CloudProviderConfig) -> Self {
1229 self.providers.push(config);
1230 self
1231 }
1232
1233 #[must_use]
1235 pub fn with_routing_strategy(mut self, strategy: RoutingStrategy) -> Self {
1236 self.routing_strategy = strategy;
1237 self
1238 }
1239
1240 #[must_use]
1242 pub fn with_failover(mut self, enabled: bool) -> Self {
1243 self.failover_enabled = enabled;
1244 self
1245 }
1246
1247 #[must_use]
1249 pub fn with_max_failover_attempts(mut self, attempts: usize) -> Self {
1250 self.max_failover_attempts = attempts;
1251 self
1252 }
1253
1254 #[must_use]
1256 pub fn with_latency_routing(mut self, enabled: bool) -> Self {
1257 if enabled {
1258 self.routing_strategy = RoutingStrategy::LatencyBased;
1259 }
1260 self
1261 }
1262
1263 #[must_use]
1265 pub fn with_cost_routing(mut self, enabled: bool) -> Self {
1266 if enabled {
1267 self.routing_strategy = RoutingStrategy::CostOptimized;
1268 }
1269 self
1270 }
1271
1272 #[must_use]
1274 pub fn with_replication(mut self, enabled: bool) -> Self {
1275 self.replication_enabled = enabled;
1276 self
1277 }
1278
1279 #[must_use]
1281 pub fn with_client_region(mut self, region: CloudRegion) -> Self {
1282 self.client_region = Some(region);
1283 self
1284 }
1285
1286 #[must_use]
1288 pub fn with_health_check_interval(mut self, interval: Duration) -> Self {
1289 self.health_check_interval = interval;
1290 self
1291 }
1292
1293 pub fn build(self) -> Result<MultiCloudManager> {
1295 if self.providers.is_empty() {
1296 return Err(CloudError::InvalidConfiguration {
1297 message: "At least one provider must be configured".to_string(),
1298 });
1299 }
1300
1301 let mut stats = HashMap::new();
1302 for provider in &self.providers {
1303 stats.insert(provider.id(), Arc::new(ProviderStats::new()));
1304 }
1305
1306 Ok(MultiCloudManager {
1307 providers: self.providers,
1308 routing_strategy: self.routing_strategy,
1309 failover_enabled: self.failover_enabled,
1310 max_failover_attempts: self.max_failover_attempts,
1311 replication_enabled: self.replication_enabled,
1312 client_region: self.client_region,
1313 stats,
1314 round_robin_counter: AtomicUsize::new(0),
1315 health_check_interval: self.health_check_interval,
1316 })
1317 }
1318}
1319
1320impl Default for MultiCloudManagerBuilder {
1321 fn default() -> Self {
1322 Self::new()
1323 }
1324}
1325
1326#[derive(Debug, Clone)]
1332pub struct TransferCostEstimate {
1333 pub bytes: u64,
1335 pub cheapest_provider: String,
1337 pub cheapest_cost: f64,
1339 pub most_expensive_provider: String,
1341 pub most_expensive_cost: f64,
1343 pub all_estimates: Vec<(String, f64)>,
1345}
1346
1347impl Default for TransferCostEstimate {
1348 fn default() -> Self {
1349 Self {
1350 bytes: 0,
1351 cheapest_provider: String::new(),
1352 cheapest_cost: 0.0,
1353 most_expensive_provider: String::new(),
1354 most_expensive_cost: 0.0,
1355 all_estimates: Vec::new(),
1356 }
1357 }
1358}
1359
1360#[derive(Debug, Clone)]
1366pub struct CrossCloudTransferConfig {
1367 pub source_provider: String,
1369 pub dest_provider: String,
1371 pub chunk_size: usize,
1373 pub max_concurrent: usize,
1375 pub verify_integrity: bool,
1377 pub delete_source: bool,
1379}
1380
1381impl Default for CrossCloudTransferConfig {
1382 fn default() -> Self {
1383 Self {
1384 source_provider: String::new(),
1385 dest_provider: String::new(),
1386 chunk_size: 8 * 1024 * 1024, max_concurrent: 4,
1388 verify_integrity: true,
1389 delete_source: false,
1390 }
1391 }
1392}
1393
1394#[derive(Debug, Clone)]
1396pub struct CrossCloudTransferResult {
1397 pub objects_transferred: usize,
1399 pub bytes_transferred: u64,
1401 pub duration: Duration,
1403 pub failures: Vec<(String, String)>,
1405 pub estimated_cost: f64,
1407}
1408
1409fn simple_random() -> u32 {
1415 use std::sync::atomic::{AtomicU64, Ordering};
1416 static SEED: AtomicU64 = AtomicU64::new(0x5deece66d);
1417
1418 let seed = SEED.load(Ordering::Relaxed);
1419 let next = seed.wrapping_mul(0x5deece66d).wrapping_add(0xb);
1420 SEED.store(next, Ordering::Relaxed);
1421
1422 (next >> 17) as u32
1423}
1424
1425#[cfg(test)]
1430mod tests {
1431 use super::*;
1432
1433 #[test]
1434 fn test_cloud_provider_from_url() {
1435 assert_eq!(
1436 CloudProvider::from_url("s3://bucket/key"),
1437 Some(CloudProvider::AwsS3)
1438 );
1439 assert_eq!(
1440 CloudProvider::from_url("gs://bucket/object"),
1441 Some(CloudProvider::Gcs)
1442 );
1443 assert_eq!(
1444 CloudProvider::from_url("az://container/blob"),
1445 Some(CloudProvider::AzureBlob)
1446 );
1447 assert_eq!(
1448 CloudProvider::from_url("https://example.com/path"),
1449 Some(CloudProvider::Http)
1450 );
1451 assert_eq!(
1452 CloudProvider::from_url("https://mybucket.s3.amazonaws.com/key"),
1453 Some(CloudProvider::AwsS3)
1454 );
1455 assert_eq!(
1456 CloudProvider::from_url("https://account.blob.core.windows.net/container"),
1457 Some(CloudProvider::AzureBlob)
1458 );
1459 assert_eq!(
1460 CloudProvider::from_url("https://storage.googleapis.com/bucket/object"),
1461 Some(CloudProvider::Gcs)
1462 );
1463 assert_eq!(CloudProvider::from_url("invalid"), None);
1464 }
1465
1466 #[test]
1467 fn test_cloud_region_codes() {
1468 let region = CloudRegion::UsEast1;
1469 assert_eq!(region.aws_code(), "us-east-1");
1470 assert_eq!(region.azure_code(), "eastus");
1471 assert_eq!(region.gcs_code(), "us-east1");
1472
1473 let region = CloudRegion::from_string("eu-west-1");
1474 assert_eq!(region, CloudRegion::EuWest1);
1475 }
1476
1477 #[test]
1478 fn test_region_latency_estimation() {
1479 let us_east = CloudRegion::UsEast1;
1480 let us_west = CloudRegion::UsWest2;
1481 let eu_west = CloudRegion::EuWest1;
1482
1483 assert_eq!(us_east.estimated_latency_to(&us_east), 1);
1485
1486 let us_to_us = us_east.estimated_latency_to(&us_west);
1488 assert!(us_to_us > 50 && us_to_us < 100);
1489
1490 let us_to_eu = us_east.estimated_latency_to(&eu_west);
1492 assert!(us_to_eu > us_to_us);
1493 }
1494
1495 #[test]
1496 fn test_provider_config_builder() {
1497 let config = CloudProviderConfig::s3("my-bucket")
1498 .with_prefix("data/")
1499 .with_region(CloudRegion::UsWest2)
1500 .with_priority(1)
1501 .with_weight(80)
1502 .with_timeout(Duration::from_secs(60));
1503
1504 assert_eq!(config.bucket, "my-bucket");
1505 assert_eq!(config.prefix, "data/");
1506 assert_eq!(config.region, Some(CloudRegion::UsWest2));
1507 assert_eq!(config.priority, 1);
1508 assert_eq!(config.weight, 80);
1509 assert_eq!(config.timeout, Duration::from_secs(60));
1510 }
1511
1512 #[test]
1513 fn test_provider_config_id() {
1514 let config = CloudProviderConfig::s3("bucket").with_prefix("prefix");
1515 assert_eq!(config.id(), "AWS S3:bucket/prefix");
1516 }
1517
1518 #[test]
1519 fn test_egress_costs() {
1520 assert!(CloudProvider::AwsS3.egress_cost_per_gb() > 0.0);
1521 assert!(CloudProvider::AzureBlob.egress_cost_per_gb() > 0.0);
1522 assert!(CloudProvider::Gcs.egress_cost_per_gb() > 0.0);
1523 assert_eq!(CloudProvider::Http.egress_cost_per_gb(), 0.0);
1524 }
1525
1526 #[test]
1527 fn test_multicloud_manager_builder() {
1528 let manager = MultiCloudManager::builder()
1529 .add_provider(CloudProviderConfig::s3("bucket1").with_priority(1))
1530 .add_provider(CloudProviderConfig::gcs("bucket2").with_priority(2))
1531 .with_failover(true)
1532 .with_latency_routing(true)
1533 .build();
1534
1535 assert!(manager.is_ok());
1536 let manager = manager.expect("Manager should be built");
1537 assert_eq!(manager.providers.len(), 2);
1538 assert_eq!(manager.routing_strategy, RoutingStrategy::LatencyBased);
1539 }
1540
1541 #[test]
1542 fn test_multicloud_manager_empty_providers() {
1543 let manager = MultiCloudManager::builder().build();
1544 assert!(manager.is_err());
1545 }
1546
1547 #[test]
1548 fn test_transfer_cost_estimate() {
1549 let manager = MultiCloudManager::builder()
1550 .add_provider(CloudProviderConfig::s3("bucket1"))
1551 .add_provider(CloudProviderConfig::http("http://example.com"))
1552 .build()
1553 .expect("Manager should be built");
1554
1555 let estimate = manager.estimate_transfer_cost(1024 * 1024 * 1024); assert!(estimate.cheapest_cost <= estimate.most_expensive_cost);
1557 }
1558
1559 #[test]
1560 fn test_provider_health() {
1561 let mut health = ProviderHealth::new("test-provider".to_string());
1562
1563 assert!(health.healthy);
1564 assert_eq!(health.consecutive_failures, 0);
1565
1566 health.record_success(100.0, 1000);
1568 health.record_success(120.0, 2000);
1569 assert!(health.avg_latency_ms > 0.0);
1570 assert_eq!(health.total_bytes, 3000);
1571
1572 health.record_failure();
1574 health.record_failure();
1575 health.record_failure();
1576
1577 assert!(!health.healthy);
1578 assert_eq!(health.consecutive_failures, 3);
1579 }
1580
1581 #[test]
1582 fn test_provider_stats() {
1583 let stats = ProviderStats::new();
1584
1585 stats.record_success(1000, 50);
1586 stats.record_success(2000, 60);
1587
1588 assert_eq!(stats.request_count.load(Ordering::Relaxed), 2);
1589 assert_eq!(stats.byte_count.load(Ordering::Relaxed), 3000);
1590 assert!((stats.avg_latency_ms() - 55.0).abs() < 0.001);
1591 assert!((stats.success_rate() - 1.0).abs() < 0.001);
1592
1593 stats.record_error();
1594 assert!(stats.success_rate() < 1.0);
1595 }
1596
1597 #[test]
1598 fn test_cross_cloud_transfer_config() {
1599 let config = CrossCloudTransferConfig::default();
1600
1601 assert_eq!(config.chunk_size, 8 * 1024 * 1024);
1602 assert_eq!(config.max_concurrent, 4);
1603 assert!(config.verify_integrity);
1604 assert!(!config.delete_source);
1605 }
1606
1607 #[test]
1608 fn test_routing_strategy_default() {
1609 let strategy = RoutingStrategy::default();
1610 assert_eq!(strategy, RoutingStrategy::Priority);
1611 }
1612
1613 #[test]
1614 fn test_select_provider_priority() {
1615 let manager = MultiCloudManager::builder()
1616 .add_provider(CloudProviderConfig::s3("bucket1").with_priority(2))
1617 .add_provider(CloudProviderConfig::gcs("bucket2").with_priority(1))
1618 .with_routing_strategy(RoutingStrategy::Priority)
1619 .build()
1620 .expect("Manager should be built");
1621
1622 let provider = manager.select_provider("get");
1623 assert!(provider.is_ok());
1624 let provider = provider.expect("Provider should be selected");
1625 assert_eq!(provider.provider, CloudProvider::Gcs);
1626 }
1627
1628 #[test]
1629 fn test_select_provider_cost_optimized() {
1630 let manager = MultiCloudManager::builder()
1631 .add_provider(CloudProviderConfig::s3("bucket1"))
1632 .add_provider(CloudProviderConfig::http("http://example.com"))
1633 .with_routing_strategy(RoutingStrategy::CostOptimized)
1634 .build()
1635 .expect("Manager should be built");
1636
1637 let provider = manager.select_provider("get");
1638 assert!(provider.is_ok());
1639 let provider = provider.expect("Provider should be selected");
1640 assert_eq!(provider.provider, CloudProvider::Http);
1642 }
1643
1644 #[test]
1645 fn test_select_provider_write_filters_readonly() {
1646 let manager = MultiCloudManager::builder()
1647 .add_provider(CloudProviderConfig::http("http://example.com").with_priority(1))
1648 .add_provider(CloudProviderConfig::s3("bucket1").with_priority(2))
1649 .with_routing_strategy(RoutingStrategy::Priority)
1650 .build()
1651 .expect("Manager should be built");
1652
1653 let provider = manager.select_provider("put");
1655 assert!(provider.is_ok());
1656 let provider = provider.expect("Provider should be selected");
1657 assert_eq!(provider.provider, CloudProvider::AwsS3);
1658 }
1659}