1use crate::error::{CoreError, ErrorContext, ErrorLocation};
16use std::collections::HashMap;
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, SystemTime};
20use thiserror::Error;
21
22#[cfg(feature = "async")]
23#[allow(unused_imports)]
24use tokio::io::{AsyncRead, AsyncWrite};
25
26#[cfg(feature = "async")]
27use async_trait::async_trait;
28
29const AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID";
31const AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY";
32const AWS_SESSION_TOKEN: &str = "AWS_SESSION_TOKEN";
33const AWS_REGION: &str = "AWS_REGION";
34
35const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";
37const GOOGLE_CLOUD_PROJECT: &str = "GOOGLE_CLOUD_PROJECT";
38
39const AZURE_STORAGE_ACCOUNT: &str = "AZURE_STORAGE_ACCOUNT";
41const AZURE_STORAGE_KEY: &str = "AZURE_STORAGE_KEY";
42const AZURE_STORAGE_SAS_TOKEN: &str = "AZURE_STORAGE_SAS_TOKEN";
43
44#[derive(Error, Debug)]
46pub enum CloudError {
47 #[error("Authentication failed: {0}")]
49 AuthenticationError(String),
50
51 #[error("Bucket/container not found: {0}")]
53 BucketNotFound(String),
54
55 #[error("Object not found: {0}")]
57 ObjectNotFound(String),
58
59 #[error("Permission denied: {0}")]
61 PermissionDenied(String),
62
63 #[error("Network error: {0}")]
65 NetworkError(String),
66
67 #[error("Service quota exceeded: {0}")]
69 QuotaExceeded(String),
70
71 #[error("Invalid configuration: {0}")]
73 InvalidConfiguration(String),
74
75 #[error("Upload failed: {0}")]
77 UploadError(String),
78
79 #[error("Download failed: {0}")]
81 DownloadError(String),
82
83 #[error("Multipart operation failed: {0}")]
85 MultipartError(String),
86
87 #[error("Metadata operation failed: {0}")]
89 MetadataError(String),
90
91 #[error("Cloud provider error: {provider} - {message}")]
93 ProviderError { provider: String, message: String },
94}
95
96impl From<CloudError> for CoreError {
97 fn from(err: CloudError) -> Self {
98 match err {
99 CloudError::AuthenticationError(msg) => CoreError::SecurityError(
100 ErrorContext::new(format!("{msg}"))
101 .with_location(ErrorLocation::new(file!(), line!())),
102 ),
103 CloudError::PermissionDenied(msg) => CoreError::SecurityError(
104 ErrorContext::new(format!("{msg}"))
105 .with_location(ErrorLocation::new(file!(), line!())),
106 ),
107 CloudError::NetworkError(msg) => CoreError::IoError(
108 ErrorContext::new(format!("{msg}"))
109 .with_location(ErrorLocation::new(file!(), line!())),
110 ),
111 _ => CoreError::IoError(
112 ErrorContext::new(format!("{err}"))
113 .with_location(ErrorLocation::new(file!(), line!())),
114 ),
115 }
116 }
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
121pub enum CloudProvider {
122 AwsS3,
124 GoogleCloud,
126 AzureBlob,
128 S3Compatible,
130}
131
132impl CloudProvider {
133 pub fn default_endpoint(&self) -> Option<&'static str> {
135 match self {
136 CloudProvider::AwsS3 => Some("https://s3.amazonaws.com"),
137 CloudProvider::GoogleCloud => Some("https://storage.googleapis.com"),
138 CloudProvider::AzureBlob => None, CloudProvider::S3Compatible => None, }
141 }
142
143 pub fn as_str(&self) -> &'static str {
145 match self {
146 CloudProvider::AwsS3 => "aws-s3",
147 CloudProvider::GoogleCloud => "google-cloud",
148 CloudProvider::AzureBlob => "azure-blob",
149 CloudProvider::S3Compatible => "s3-compatible",
150 }
151 }
152}
153
154#[derive(Debug, Clone)]
156pub enum CloudCredentials {
157 Aws {
159 access_key_id: String,
160 secret_access_key: String,
161 session_token: Option<String>,
162 region: String,
163 },
164 Google {
166 service_account_key: String,
167 project_id: String,
168 },
169 Azure {
171 account_name: String,
172 account_key: String,
173 sas_token: Option<String>,
174 },
175 Anonymous,
177}
178
179impl CloudCredentials {
180 pub fn aws_from_env() -> Result<Self, CloudError> {
182 let access_key_id = std::env::var(AWS_ACCESS_KEY_ID).map_err(|_| {
183 CloudError::AuthenticationError("AWS_ACCESS_KEY_ID not found".to_string())
184 })?;
185 let secret_access_key = std::env::var(AWS_SECRET_ACCESS_KEY).map_err(|_| {
186 CloudError::AuthenticationError("AWS_SECRET_ACCESS_KEY not found".to_string())
187 })?;
188 let session_token = std::env::var(AWS_SESSION_TOKEN).ok();
189 let region = std::env::var(AWS_REGION).unwrap_or_else(|_| "us-east-1".to_string());
190
191 Ok(CloudCredentials::Aws {
192 access_key_id,
193 secret_access_key,
194 session_token,
195 region,
196 })
197 }
198
199 pub fn google_from_env() -> Result<Self, CloudError> {
201 let service_account_key = std::env::var(GOOGLE_APPLICATION_CREDENTIALS).map_err(|_| {
202 CloudError::AuthenticationError("GOOGLE_APPLICATION_CREDENTIALS not found".to_string())
203 })?;
204 let project_id = std::env::var(GOOGLE_CLOUD_PROJECT).map_err(|_| {
205 CloudError::AuthenticationError("GOOGLE_CLOUD_PROJECT not found".to_string())
206 })?;
207
208 Ok(CloudCredentials::Google {
209 service_account_key,
210 project_id,
211 })
212 }
213
214 pub fn azure_from_env() -> Result<Self, CloudError> {
216 let account_name = std::env::var(AZURE_STORAGE_ACCOUNT).map_err(|_| {
217 CloudError::AuthenticationError("AZURE_STORAGE_ACCOUNT not found".to_string())
218 })?;
219 let account_key = std::env::var(AZURE_STORAGE_KEY).map_err(|_| {
220 CloudError::AuthenticationError("AZURE_STORAGE_KEY not found".to_string())
221 })?;
222 let sas_token = std::env::var(AZURE_STORAGE_SAS_TOKEN).ok();
223
224 Ok(CloudCredentials::Azure {
225 account_name,
226 account_key,
227 sas_token,
228 })
229 }
230}
231
232#[derive(Debug, Clone)]
234pub struct CloudConfig {
235 pub provider: CloudProvider,
237 pub endpoint: Option<String>,
239 pub bucket: String,
241 pub credentials: CloudCredentials,
243 pub timeout: Duration,
245 pub maxretries: u32,
247 pub multipart_threshold: usize,
249 pub chunk_size: usize,
251 pub max_concurrency: usize,
253 pub enable_cache: bool,
255 pub cache_dir: Option<PathBuf>,
257}
258
259impl Default for CloudConfig {
260 fn default() -> Self {
261 Self {
262 provider: CloudProvider::AwsS3,
263 endpoint: None,
264 bucket: String::new(),
265 credentials: CloudCredentials::Anonymous,
266 timeout: Duration::from_secs(30),
267 maxretries: 3,
268 multipart_threshold: 100 * 1024 * 1024, chunk_size: 8 * 1024 * 1024, max_concurrency: 10,
271 enable_cache: true,
272 cache_dir: None,
273 }
274 }
275}
276
277impl CloudConfig {
278 pub fn new_bucket(bucket: String, credentials: CloudCredentials) -> Self {
280 Self {
281 provider: CloudProvider::AwsS3,
282 bucket,
283 credentials,
284 ..Default::default()
285 }
286 }
287
288 pub fn bucket_2(bucket: String, credentials: CloudCredentials) -> Self {
290 Self {
291 provider: CloudProvider::GoogleCloud,
292 bucket,
293 credentials,
294 ..Default::default()
295 }
296 }
297
298 pub fn container(container: String, credentials: CloudCredentials) -> Self {
300 Self {
301 provider: CloudProvider::AzureBlob,
302 bucket: container,
303 credentials,
304 ..Default::default()
305 }
306 }
307
308 pub fn with_endpoint(mut self, endpoint: String) -> Self {
310 self.endpoint = Some(endpoint);
311 self
312 }
313
314 pub fn with_timeout(mut self, timeout: Duration) -> Self {
316 self.timeout = timeout;
317 self
318 }
319
320 pub fn with_multipart(mut self, threshold: usize, chunk_size: usize) -> Self {
322 self.multipart_threshold = threshold;
323 self.chunk_size = chunk_size;
324 self
325 }
326
327 pub fn with_cache(mut self, enable: bool, cache_dir: Option<PathBuf>) -> Self {
329 self.enable_cache = enable;
330 self.cache_dir = cache_dir;
331 self
332 }
333}
334
335#[derive(Debug, Clone)]
337pub struct CloudObjectMetadata {
338 pub key: String,
340 pub size: u64,
342 pub last_modified: SystemTime,
344 pub etag: Option<String>,
346 pub content_type: Option<String>,
348 pub metadata: HashMap<String, String>,
350 pub storage_class: Option<String>,
352}
353
354pub type ProgressCallback = Box<dyn Fn(u64, u64) + Send + Sync>;
356
357#[derive(Default)]
359pub struct TransferOptions {
360 pub progress_callback: Option<ProgressCallback>,
362 pub metadata: HashMap<String, String>,
364 pub content_type: Option<String>,
366 pub storage_class: Option<String>,
368 pub overwrite: bool,
370 pub encryption: Option<EncryptionConfig>,
372}
373
374#[derive(Debug, Clone)]
376pub struct EncryptionConfig {
377 pub method: EncryptionMethod,
379 pub key: Option<String>,
381}
382
383#[derive(Debug, Clone)]
385pub enum EncryptionMethod {
386 ServerSideManaged,
388 CustomerManaged,
390 CustomerProvided,
392}
393
394#[derive(Debug, Clone)]
396pub struct ListResult {
397 pub objects: Vec<CloudObjectMetadata>,
399 pub has_more: bool,
401 pub next_token: Option<String>,
403}
404
405#[cfg(feature = "async")]
407#[async_trait]
408pub trait CloudStorageBackend: Send + Sync {
409 async fn upload_file(
411 &self,
412 key: &str,
413 options: TransferOptions,
414 ) -> Result<CloudObjectMetadata, CloudError>;
415
416 async fn download_file(
418 &self,
419 path: &Path,
420 options: TransferOptions,
421 ) -> Result<CloudObjectMetadata, CloudError>;
422
423 async fn upload_data(
425 &self,
426 data: &[u8],
427 key: &str,
428 options: TransferOptions,
429 ) -> Result<CloudObjectMetadata, CloudError>;
430
431 async fn get_object(&self, key: &str) -> Result<Vec<u8>, CloudError>;
433
434 async fn get_metadata(&self, key: &str) -> Result<CloudObjectMetadata, CloudError>;
436
437 async fn object_exists(&self, key: &str) -> Result<bool, CloudError>;
439
440 async fn delete_object(&self, key: &str) -> Result<(), CloudError>;
442
443 async fn list_objects(
445 &self,
446 prefix: Option<&str>,
447 continuation_token: Option<&str>,
448 ) -> Result<ListResult, CloudError>;
449
450 async fn copy_object(
452 &self,
453 source_key: &str,
454 dest_key: &str,
455 options: TransferOptions,
456 ) -> Result<CloudObjectMetadata, CloudError>;
457
458 async fn generate_presigned_url(
460 &self,
461 key: &str,
462 expiration: Duration,
463 method: HttpMethod,
464 ) -> Result<String, CloudError>;
465}
466
467#[derive(Debug, Clone, Copy)]
469pub enum HttpMethod {
470 Get,
471 Put,
472 Post,
473 Delete,
474}
475
476pub struct CloudStorageClient {
478 config: CloudConfig,
479 backend: Box<dyn CloudStorageBackend>,
480 cache: Option<Arc<Mutex<CloudCache>>>,
481}
482
483#[derive(Debug)]
485struct CloudCache {
486 metadata_cache: HashMap<String, (CloudObjectMetadata, SystemTime)>,
487 cache_ttl: Duration,
488}
489
490impl CloudCache {
491 fn new(ttl: Duration) -> Self {
492 Self {
493 metadata_cache: HashMap::new(),
494 cache_ttl: ttl,
495 }
496 }
497
498 fn get_metadata(&mut self, key: &str) -> Option<CloudObjectMetadata> {
499 if let Some((metadata, timestamp)) = self.metadata_cache.get(key) {
500 if timestamp.elapsed().unwrap_or(Duration::MAX) < self.cache_ttl {
501 return Some(metadata.clone());
502 } else {
503 self.metadata_cache.remove(key);
504 }
505 }
506 None
507 }
508
509 fn put_metadata(&mut self, key: String, metadata: CloudObjectMetadata) {
510 self.metadata_cache
511 .insert(key, (metadata, SystemTime::now()));
512 }
513
514 fn invalidate(&mut self, key: &str) {
515 self.metadata_cache.remove(key);
516 }
517
518 fn clear(&mut self) {
519 self.metadata_cache.clear();
520 }
521}
522
523impl CloudStorageClient {
524 pub fn new(config: CloudConfig) -> Result<Self, CloudError> {
526 let backend = Self::create_backend(&config)?;
527 let cache = if config.enable_cache {
528 Some(Arc::new(Mutex::new(CloudCache::new(Duration::from_secs(
529 300,
530 )))))
531 } else {
532 None
533 };
534
535 Ok(Self {
536 config,
537 backend,
538 cache,
539 })
540 }
541
542 fn create_backend(config: &CloudConfig) -> Result<Box<dyn CloudStorageBackend>, CloudError> {
544 match config.provider {
545 CloudProvider::AwsS3 | CloudProvider::S3Compatible => {
546 Ok(Box::new(S3Backend::new(config.clone())?))
547 }
548 CloudProvider::GoogleCloud => Ok(Box::new(GoogleCloudBackend::new(config.clone())?)),
549 CloudProvider::AzureBlob => Ok(Box::new(AzureBackend::new(config.clone())?)),
550 }
551 }
552
553 #[cfg(feature = "async")]
555 pub async fn upload_file<P: AsRef<Path>>(
556 &self,
557 local_path: P,
558 remote_key: &str,
559 options: TransferOptions,
560 ) -> Result<CloudObjectMetadata, CloudError> {
561 let result = self.backend.upload_file(remote_key, options).await?;
562
563 if let Some(cache) = &self.cache {
565 cache
566 .lock()
567 .expect("Operation failed")
568 .put_metadata(remote_key.to_string(), result.clone());
569 }
570
571 Ok(result)
572 }
573
574 #[cfg(feature = "async")]
576 pub async fn download_file<P: AsRef<Path>>(
577 &self,
578 remote_key: &str,
579 local_path: P,
580 options: TransferOptions,
581 ) -> Result<CloudObjectMetadata, CloudError> {
582 self.backend
583 .download_file(local_path.as_ref(), options)
584 .await
585 }
586
587 #[cfg(feature = "async")]
589 pub async fn upload_data(
590 &self,
591 data: &[u8],
592 remote_key: &str,
593 options: TransferOptions,
594 ) -> Result<CloudObjectMetadata, CloudError> {
595 let result = self.backend.upload_data(data, remote_key, options).await?;
596
597 if let Some(cache) = &self.cache {
599 cache
600 .lock()
601 .expect("Operation failed")
602 .put_metadata(remote_key.to_string(), result.clone());
603 }
604
605 Ok(result)
606 }
607
608 #[cfg(feature = "async")]
610 pub async fn get_object(&self, key: &str) -> Result<Vec<u8>, CloudError> {
611 self.backend.get_object(key).await
612 }
613
614 #[cfg(feature = "async")]
616 pub async fn get_metadata(&self, key: &str) -> Result<CloudObjectMetadata, CloudError> {
617 if let Some(cache) = &self.cache {
619 if let Some(metadata) = cache.lock().expect("Operation failed").get_metadata(key) {
620 return Ok(metadata);
621 }
622 }
623
624 let metadata = self.backend.get_metadata(key).await?;
626
627 if let Some(cache) = &self.cache {
629 cache
630 .lock()
631 .expect("Operation failed")
632 .put_metadata(key.to_string(), metadata.clone());
633 }
634
635 Ok(metadata)
636 }
637
638 #[cfg(feature = "async")]
640 pub async fn object_exists(&self, key: &str) -> Result<bool, CloudError> {
641 self.backend.object_exists(key).await
642 }
643
644 #[cfg(feature = "async")]
646 pub async fn delete_object(&self, key: &str) -> Result<(), CloudError> {
647 let result = self.backend.delete_object(key).await;
648
649 if let Some(cache) = &self.cache {
651 cache.lock().expect("Operation failed").invalidate(key);
652 }
653
654 result
655 }
656
657 #[cfg(feature = "async")]
659 pub async fn list_objects(
660 &self,
661 prefix: Option<&str>,
662 continuation_token: Option<&str>,
663 ) -> Result<ListResult, CloudError> {
664 self.backend.list_objects(prefix, continuation_token).await
665 }
666
667 #[cfg(feature = "async")]
669 pub async fn copy_object(
670 &self,
671 source_key: &str,
672 dest_key: &str,
673 options: TransferOptions,
674 ) -> Result<CloudObjectMetadata, CloudError> {
675 let result = self
676 .backend
677 .copy_object(source_key, dest_key, options)
678 .await?;
679
680 if let Some(cache) = &self.cache {
682 cache
683 .lock()
684 .expect("Operation failed")
685 .put_metadata(dest_key.to_string(), result.clone());
686 }
687
688 Ok(result)
689 }
690
691 #[cfg(feature = "async")]
693 pub async fn generate_presigned_url(
694 &self,
695 remote_key: &str,
696 expiration: Duration,
697 method: HttpMethod,
698 ) -> Result<String, CloudError> {
699 self.backend
700 .generate_presigned_url(remote_key, expiration, method)
701 .await
702 }
703
704 pub fn clear_cache(&self) {
706 if let Some(cache) = &self.cache {
707 cache.lock().expect("Operation failed").clear();
708 }
709 }
710
711 pub fn config(&self) -> &CloudConfig {
713 &self.config
714 }
715}
716
717struct S3Backend {
719 config: CloudConfig,
720}
721
722impl S3Backend {
723 fn new(config: CloudConfig) -> Result<Self, CloudError> {
724 match &config.credentials {
726 CloudCredentials::Aws { .. } | CloudCredentials::Anonymous => {}
727 _ => {
728 return Err(CloudError::InvalidConfiguration(
729 "Invalid credentials for S3".to_string(),
730 ))
731 }
732 }
733
734 Ok(Self { config })
735 }
736}
737
738#[cfg(feature = "async")]
739#[async_trait]
740impl CloudStorageBackend for S3Backend {
741 async fn upload_file(
742 &self,
743 key: &str,
744 options: TransferOptions,
745 ) -> Result<CloudObjectMetadata, CloudError> {
746 let file_size = 1024; Ok(CloudObjectMetadata {
753 key: key.to_string(),
754 size: file_size,
755 last_modified: SystemTime::now(),
756 etag: Some("\"mock-etag\"".to_string()),
757 content_type: options
758 .content_type
759 .or_else(|| Some("application/octet-stream".to_string())),
760 metadata: options.metadata,
761 storage_class: options
762 .storage_class
763 .or_else(|| Some("STANDARD".to_string())),
764 })
765 }
766
767 async fn download_file(
768 &self,
769 path: &Path,
770 options: TransferOptions,
771 ) -> Result<CloudObjectMetadata, CloudError> {
772 std::fs::write(path, b"mock file content")
774 .map_err(|e| CloudError::DownloadError(format!("{e}")))?;
775
776 Ok(CloudObjectMetadata {
777 key: path.to_string_lossy().to_string(),
778 size: 17, last_modified: SystemTime::now(),
780 etag: Some("\"mock-etag\"".to_string()),
781 content_type: options
782 .content_type
783 .or_else(|| Some("application/octet-stream".to_string())),
784 metadata: options.metadata,
785 storage_class: options
786 .storage_class
787 .or_else(|| Some("STANDARD".to_string())),
788 })
789 }
790
791 async fn upload_data(
792 &self,
793 data: &[u8],
794 key: &str,
795 options: TransferOptions,
796 ) -> Result<CloudObjectMetadata, CloudError> {
797 Ok(CloudObjectMetadata {
799 key: key.to_string(),
800 size: data.len() as u64,
801 last_modified: SystemTime::now(),
802 etag: Some("\"mock-etag\"".to_string()),
803 content_type: options
804 .content_type
805 .or_else(|| Some("application/octet-stream".to_string())),
806 metadata: options.metadata,
807 storage_class: options
808 .storage_class
809 .or_else(|| Some("STANDARD".to_string())),
810 })
811 }
812
813 async fn get_object(&self, key: &str) -> Result<Vec<u8>, CloudError> {
814 Ok(format!("{key}").into_bytes())
816 }
817
818 async fn get_metadata(&self, key: &str) -> Result<CloudObjectMetadata, CloudError> {
819 Ok(CloudObjectMetadata {
821 key: key.to_string(),
822 size: 1024,
823 last_modified: SystemTime::now(),
824 etag: Some("\"mock-etag\"".to_string()),
825 content_type: Some("application/octet-stream".to_string()),
826 metadata: HashMap::new(),
827 storage_class: Some("STANDARD".to_string()),
828 })
829 }
830
831 async fn object_exists(&self, key: &str) -> Result<bool, CloudError> {
832 Ok(true)
834 }
835
836 async fn delete_object(&self, key: &str) -> Result<(), CloudError> {
837 Ok(())
839 }
840
841 async fn list_objects(
842 &self,
843 prefix: Option<&str>,
844 continuation_token: Option<&str>,
845 ) -> Result<ListResult, CloudError> {
846 let mut objects = Vec::new();
848 let max = 10; for i in 0..max {
851 let key = if let Some(prefix) = prefix {
852 format!("{prefix}_{i}")
853 } else {
854 format!("object_{i}")
855 };
856
857 objects.push(CloudObjectMetadata {
858 key,
859 size: 1024 * (i + 1) as u64,
860 last_modified: SystemTime::now(),
861 etag: Some(format!("\"etag-{}\"", i)),
862 content_type: Some("application/octet-stream".to_string()),
863 metadata: HashMap::new(),
864 storage_class: Some("STANDARD".to_string()),
865 });
866 }
867
868 Ok(ListResult {
869 objects,
870 has_more: false,
871 next_token: continuation_token.map(|s| s.to_string()),
872 })
873 }
874
875 async fn copy_object(
876 &self,
877 source_key: &str,
878 dest_key: &str,
879 options: TransferOptions,
880 ) -> Result<CloudObjectMetadata, CloudError> {
881 Ok(CloudObjectMetadata {
883 key: dest_key.to_string(),
884 size: 1024,
885 last_modified: SystemTime::now(),
886 etag: Some("\"mock-etag\"".to_string()),
887 content_type: options
888 .content_type
889 .or_else(|| Some("application/octet-stream".to_string())),
890 metadata: options.metadata,
891 storage_class: options
892 .storage_class
893 .or_else(|| Some("STANDARD".to_string())),
894 })
895 }
896
897 async fn generate_presigned_url(
898 &self,
899 key: &str,
900 expiration: Duration,
901 method: HttpMethod,
902 ) -> Result<String, CloudError> {
903 let method_str = match method {
905 HttpMethod::Get => "GET",
906 HttpMethod::Put => "PUT",
907 HttpMethod::Post => "POST",
908 HttpMethod::Delete => "DELETE",
909 };
910
911 Ok(format!(
912 "https://s3.amazonaws.com/{}/{}?expires={}&method={}",
913 self.config.bucket,
914 key,
915 expiration.as_secs(),
916 method_str
917 ))
918 }
919}
920
921struct GoogleCloudBackend {
923 config: CloudConfig,
924}
925
926impl GoogleCloudBackend {
927 fn new(config: CloudConfig) -> Result<Self, CloudError> {
928 match &config.credentials {
930 CloudCredentials::Google { .. } | CloudCredentials::Anonymous => {}
931 _ => {
932 return Err(CloudError::InvalidConfiguration(
933 "Invalid credentials for GCS".to_string(),
934 ))
935 }
936 }
937
938 Ok(Self { config })
939 }
940}
941
942#[cfg(feature = "async")]
943#[async_trait]
944impl CloudStorageBackend for GoogleCloudBackend {
945 async fn upload_file(
946 &self,
947 key: &str,
948 options: TransferOptions,
949 ) -> Result<CloudObjectMetadata, CloudError> {
950 let file_size = 1024; Ok(CloudObjectMetadata {
954 key: key.to_string(),
955 size: file_size,
956 last_modified: SystemTime::now(),
957 etag: Some("mock-gcs-etag".to_string()),
958 content_type: options
959 .content_type
960 .or_else(|| Some("application/octet-stream".to_string())),
961 metadata: options.metadata,
962 storage_class: options
963 .storage_class
964 .or_else(|| Some("STANDARD".to_string())),
965 })
966 }
967
968 async fn download_file(
969 &self,
970 path: &Path,
971 options: TransferOptions,
972 ) -> Result<CloudObjectMetadata, CloudError> {
973 std::fs::write(path, b"mock gcs content")
974 .map_err(|e| CloudError::DownloadError(format!("{e}")))?;
975
976 Ok(CloudObjectMetadata {
977 key: path.to_string_lossy().to_string(),
978 size: 16,
979 last_modified: SystemTime::now(),
980 etag: Some("mock-gcs-etag".to_string()),
981 content_type: options
982 .content_type
983 .or_else(|| Some("application/octet-stream".to_string())),
984 metadata: options.metadata,
985 storage_class: options
986 .storage_class
987 .or_else(|| Some("STANDARD".to_string())),
988 })
989 }
990
991 async fn upload_data(
992 &self,
993 data: &[u8],
994 key: &str,
995 options: TransferOptions,
996 ) -> Result<CloudObjectMetadata, CloudError> {
997 Ok(CloudObjectMetadata {
998 key: key.to_string(),
999 size: data.len() as u64,
1000 last_modified: SystemTime::now(),
1001 etag: Some("mock-gcs-etag".to_string()),
1002 content_type: options
1003 .content_type
1004 .or_else(|| Some("application/octet-stream".to_string())),
1005 metadata: options.metadata,
1006 storage_class: options
1007 .storage_class
1008 .or_else(|| Some("STANDARD".to_string())),
1009 })
1010 }
1011
1012 async fn get_object(&self, key: &str) -> Result<Vec<u8>, CloudError> {
1013 Ok(format!("{key}").into_bytes())
1014 }
1015
1016 async fn get_metadata(&self, key: &str) -> Result<CloudObjectMetadata, CloudError> {
1017 Ok(CloudObjectMetadata {
1018 key: key.to_string(),
1019 size: 1024,
1020 last_modified: SystemTime::now(),
1021 etag: Some("mock-gcs-etag".to_string()),
1022 content_type: Some("application/octet-stream".to_string()),
1023 metadata: HashMap::new(),
1024 storage_class: Some("STANDARD".to_string()),
1025 })
1026 }
1027
1028 async fn object_exists(&self, key: &str) -> Result<bool, CloudError> {
1029 Ok(true)
1030 }
1031
1032 async fn delete_object(&self, key: &str) -> Result<(), CloudError> {
1033 Ok(())
1034 }
1035
1036 async fn list_objects(
1037 &self,
1038 prefix: Option<&str>,
1039 continuation_token: Option<&str>,
1040 ) -> Result<ListResult, CloudError> {
1041 let mut objects = Vec::new();
1042 let max = 10; for i in 0..max {
1045 let key = if let Some(prefix) = prefix {
1046 format!("{prefix}_{i}")
1047 } else {
1048 format!("object_{i}")
1049 };
1050
1051 objects.push(CloudObjectMetadata {
1052 key,
1053 size: 1024 * (i + 1) as u64,
1054 last_modified: SystemTime::now(),
1055 etag: Some(format!("gcs-etag-{i}")),
1056 content_type: Some("application/octet-stream".to_string()),
1057 metadata: HashMap::new(),
1058 storage_class: Some("STANDARD".to_string()),
1059 });
1060 }
1061
1062 Ok(ListResult {
1063 objects,
1064 has_more: false,
1065 next_token: continuation_token.map(|s| s.to_string()),
1066 })
1067 }
1068
1069 async fn copy_object(
1070 &self,
1071 source_key: &str,
1072 dest_key: &str,
1073 options: TransferOptions,
1074 ) -> Result<CloudObjectMetadata, CloudError> {
1075 Ok(CloudObjectMetadata {
1076 key: dest_key.to_string(),
1077 size: 1024,
1078 last_modified: SystemTime::now(),
1079 etag: Some("mock-gcs-etag".to_string()),
1080 content_type: options
1081 .content_type
1082 .or_else(|| Some("application/octet-stream".to_string())),
1083 metadata: options.metadata,
1084 storage_class: options
1085 .storage_class
1086 .or_else(|| Some("STANDARD".to_string())),
1087 })
1088 }
1089
1090 async fn generate_presigned_url(
1091 &self,
1092 key: &str,
1093 expiration: Duration,
1094 method: HttpMethod,
1095 ) -> Result<String, CloudError> {
1096 let method_str = match method {
1097 HttpMethod::Get => "GET",
1098 HttpMethod::Put => "PUT",
1099 HttpMethod::Post => "POST",
1100 HttpMethod::Delete => "DELETE",
1101 };
1102
1103 Ok(format!(
1104 "https://storage.googleapis.com/{}/{}?expires={}&method={}",
1105 self.config.bucket,
1106 key,
1107 expiration.as_secs(),
1108 method_str
1109 ))
1110 }
1111}
1112
1113struct AzureBackend {
1115 config: CloudConfig,
1116}
1117
1118impl AzureBackend {
1119 fn new(config: CloudConfig) -> Result<Self, CloudError> {
1120 match &config.credentials {
1122 CloudCredentials::Azure { .. } | CloudCredentials::Anonymous => {}
1123 _ => {
1124 return Err(CloudError::InvalidConfiguration(
1125 "Invalid credentials for Azure".to_string(),
1126 ))
1127 }
1128 }
1129
1130 Ok(Self { config })
1131 }
1132}
1133
1134#[cfg(feature = "async")]
1135#[async_trait]
1136impl CloudStorageBackend for AzureBackend {
1137 async fn upload_file(
1138 &self,
1139 key: &str,
1140 options: TransferOptions,
1141 ) -> Result<CloudObjectMetadata, CloudError> {
1142 let file_size = 1024; Ok(CloudObjectMetadata {
1145 key: key.to_string(),
1146 size: file_size,
1147 last_modified: SystemTime::now(),
1148 etag: Some("mock-azure-etag".to_string()),
1149 content_type: options
1150 .content_type
1151 .or_else(|| Some("application/octet-stream".to_string())),
1152 metadata: options.metadata,
1153 storage_class: options.storage_class.or_else(|| Some("Hot".to_string())),
1154 })
1155 }
1156
1157 async fn download_file(
1158 &self,
1159 path: &Path,
1160 options: TransferOptions,
1161 ) -> Result<CloudObjectMetadata, CloudError> {
1162 std::fs::write(path, b"mock azure content")
1163 .map_err(|e| CloudError::DownloadError(format!("{e}")))?;
1164
1165 Ok(CloudObjectMetadata {
1166 key: path.to_string_lossy().to_string(),
1167 size: 18,
1168 last_modified: SystemTime::now(),
1169 etag: Some("mock-azure-etag".to_string()),
1170 content_type: options
1171 .content_type
1172 .or_else(|| Some("application/octet-stream".to_string())),
1173 metadata: options.metadata,
1174 storage_class: options.storage_class.or_else(|| Some("Hot".to_string())),
1175 })
1176 }
1177
1178 async fn upload_data(
1179 &self,
1180 data: &[u8],
1181 key: &str,
1182 options: TransferOptions,
1183 ) -> Result<CloudObjectMetadata, CloudError> {
1184 Ok(CloudObjectMetadata {
1185 key: key.to_string(),
1186 size: data.len() as u64,
1187 last_modified: SystemTime::now(),
1188 etag: Some("mock-azure-etag".to_string()),
1189 content_type: options
1190 .content_type
1191 .or_else(|| Some("application/octet-stream".to_string())),
1192 metadata: options.metadata,
1193 storage_class: options.storage_class.or_else(|| Some("Hot".to_string())),
1194 })
1195 }
1196
1197 async fn get_object(&self, key: &str) -> Result<Vec<u8>, CloudError> {
1198 Ok(format!("{key}").into_bytes())
1199 }
1200
1201 async fn get_metadata(&self, key: &str) -> Result<CloudObjectMetadata, CloudError> {
1202 Ok(CloudObjectMetadata {
1203 key: key.to_string(),
1204 size: 1024,
1205 last_modified: SystemTime::now(),
1206 etag: Some("mock-azure-etag".to_string()),
1207 content_type: Some("application/octet-stream".to_string()),
1208 metadata: HashMap::new(),
1209 storage_class: Some("Hot".to_string()),
1210 })
1211 }
1212
1213 async fn object_exists(&self, key: &str) -> Result<bool, CloudError> {
1214 Ok(true)
1215 }
1216
1217 async fn delete_object(&self, key: &str) -> Result<(), CloudError> {
1218 Ok(())
1219 }
1220
1221 async fn list_objects(
1222 &self,
1223 prefix: Option<&str>,
1224 continuation_token: Option<&str>,
1225 ) -> Result<ListResult, CloudError> {
1226 let mut objects = Vec::new();
1227 let max = 10; for i in 0..max {
1230 let key = if let Some(prefix) = prefix {
1231 format!("{prefix}_{i}")
1232 } else {
1233 format!("object_{i}")
1234 };
1235
1236 objects.push(CloudObjectMetadata {
1237 key,
1238 size: 1024 * (i + 1) as u64,
1239 last_modified: SystemTime::now(),
1240 etag: Some(format!("azure-etag-{i}")),
1241 content_type: Some("application/octet-stream".to_string()),
1242 metadata: HashMap::new(),
1243 storage_class: Some("Hot".to_string()),
1244 });
1245 }
1246
1247 Ok(ListResult {
1248 objects,
1249 has_more: false,
1250 next_token: continuation_token.map(|s| s.to_string()),
1251 })
1252 }
1253
1254 async fn copy_object(
1255 &self,
1256 source_key: &str,
1257 dest_key: &str,
1258 options: TransferOptions,
1259 ) -> Result<CloudObjectMetadata, CloudError> {
1260 Ok(CloudObjectMetadata {
1261 key: dest_key.to_string(),
1262 size: 1024,
1263 last_modified: SystemTime::now(),
1264 etag: Some("mock-azure-etag".to_string()),
1265 content_type: options
1266 .content_type
1267 .or_else(|| Some("application/octet-stream".to_string())),
1268 metadata: options.metadata,
1269 storage_class: options.storage_class.or_else(|| Some("Hot".to_string())),
1270 })
1271 }
1272
1273 async fn generate_presigned_url(
1274 &self,
1275 key: &str,
1276 expiration: Duration,
1277 method: HttpMethod,
1278 ) -> Result<String, CloudError> {
1279 let method_str = match method {
1280 HttpMethod::Get => "GET",
1281 HttpMethod::Put => "PUT",
1282 HttpMethod::Post => "POST",
1283 HttpMethod::Delete => "DELETE",
1284 };
1285
1286 let account_name = match &self.config.credentials {
1287 CloudCredentials::Azure { account_name, .. } => account_name,
1288 _ => "mockaccount",
1289 };
1290
1291 Ok(format!(
1292 "https://{}.blob.core.windows.net/{}/{}?expires={}&method={}",
1293 account_name,
1294 self.config.bucket,
1295 key,
1296 expiration.as_secs(),
1297 method_str
1298 ))
1299 }
1300}
1301
1302pub mod utils {
1304 use super::*;
1305
1306 #[cfg(feature = "async")]
1308 pub async fn sync_directory_to_cloud(
1309 client: &CloudStorageClient,
1310 local_dir: &Path,
1311 remote_prefix: &str,
1312 recursive: bool,
1313 ) -> Result<usize, CloudError> {
1314 let mut uploaded_count = 0;
1315
1316 fn visit_dir(dir: &Path, files: &mut Vec<PathBuf>) -> std::io::Result<()> {
1317 for entry in std::fs::read_dir(dir)? {
1318 let entry = entry?;
1319 let path = entry.path();
1320 if path.is_file() {
1321 files.push(path);
1322 } else if path.is_dir() {
1323 visit_dir(&path, files)?;
1324 }
1325 }
1326 Ok(())
1327 }
1328
1329 let mut files = Vec::new();
1330 if recursive {
1331 visit_dir(local_dir, &mut files).map_err(|e| CloudError::UploadError(e.to_string()))?;
1332 } else {
1333 for entry in
1334 std::fs::read_dir(local_dir).map_err(|e| CloudError::UploadError(e.to_string()))?
1335 {
1336 let entry = entry.map_err(|e| CloudError::UploadError(e.to_string()))?;
1337 let path = entry.path();
1338 if path.is_file() {
1339 files.push(path);
1340 }
1341 }
1342 }
1343
1344 for file_path in files {
1345 let relative_path = file_path
1346 .strip_prefix(local_dir)
1347 .map_err(|e| CloudError::UploadError(e.to_string()))?;
1348 let remote_key = format!("{}/{}", remote_prefix, relative_path.to_string_lossy());
1349
1350 client
1351 .upload_file(&file_path, &remote_key, TransferOptions::default())
1352 .await?;
1353 uploaded_count += 1;
1354 }
1355
1356 Ok(uploaded_count)
1357 }
1358
1359 #[cfg(feature = "async")]
1361 pub async fn sync_cloud_to_directory(
1362 client: &CloudStorageClient,
1363 remote_prefix: &str,
1364 local_dir: &Path,
1365 ) -> Result<usize, CloudError> {
1366 let mut downloaded_count = 0;
1367 let mut continuation_token = None;
1368
1369 loop {
1370 let result = client
1371 .list_objects(Some(remote_prefix), continuation_token.as_deref())
1372 .await?;
1373
1374 for object in &result.objects {
1375 let relative_path = object
1376 .key
1377 .strip_prefix(remote_prefix)
1378 .unwrap_or(&object.key);
1379 let local_path = local_dir.join(relative_path);
1380
1381 if let Some(parent) = local_path.parent() {
1383 std::fs::create_dir_all(parent)
1384 .map_err(|e| CloudError::DownloadError(e.to_string()))?;
1385 }
1386
1387 client
1388 .download_file(&object.key, &local_path, TransferOptions::default())
1389 .await?;
1390 downloaded_count += 1;
1391 }
1392
1393 if !result.has_more {
1394 break;
1395 }
1396 continuation_token = result.next_token;
1397 }
1398
1399 Ok(downloaded_count)
1400 }
1401
1402 #[cfg(feature = "async")]
1404 pub async fn calculate_storage_usage(
1405 client: &CloudStorageClient,
1406 prefix: Option<&str>,
1407 ) -> Result<u64, CloudError> {
1408 let mut total_size = 0;
1409 let mut continuation_token = None;
1410
1411 loop {
1412 let result = client
1413 .list_objects(prefix, continuation_token.as_deref())
1414 .await?;
1415
1416 for object in &result.objects {
1417 total_size += object.size;
1418 }
1419
1420 if !result.has_more {
1421 break;
1422 }
1423 continuation_token = result.next_token;
1424 }
1425
1426 Ok(total_size)
1427 }
1428}
1429
1430#[cfg(test)]
1431mod tests {
1432 use super::*;
1433 #[allow(unused_imports)]
1434 use tempfile::tempdir;
1435
1436 #[test]
1437 fn test_cloud_provider_methods() {
1438 assert_eq!(CloudProvider::AwsS3.as_str(), "aws-s3");
1439 assert_eq!(CloudProvider::GoogleCloud.as_str(), "google-cloud");
1440 assert_eq!(CloudProvider::AzureBlob.as_str(), "azure-blob");
1441
1442 assert!(CloudProvider::AwsS3.default_endpoint().is_some());
1443 assert!(CloudProvider::GoogleCloud.default_endpoint().is_some());
1444 assert!(CloudProvider::AzureBlob.default_endpoint().is_none());
1445 }
1446
1447 #[test]
1448 fn test_cloud_config_builders() {
1449 let creds = CloudCredentials::Anonymous;
1450
1451 let s3_config = CloudConfig::new_bucket("test-bucket".to_string(), creds.clone());
1452 assert_eq!(s3_config.provider, CloudProvider::AwsS3);
1453 assert_eq!(s3_config.bucket, "test-bucket");
1454
1455 let gcs_config = CloudConfig::new_bucket("test-bucket".to_string(), creds.clone());
1456 assert_eq!(gcs_config.provider, CloudProvider::AwsS3);
1457
1458 let azure_config = CloudConfig::container("test-container".to_string(), creds);
1459 assert_eq!(azure_config.provider, CloudProvider::AzureBlob);
1460 }
1461
1462 #[test]
1463 fn test_cloud_config_with_modifiers() {
1464 let config = CloudConfig::default()
1465 .with_endpoint("https://custom.endpoint.com".to_string())
1466 .with_timeout(Duration::from_secs(60))
1467 .with_multipart(50 * 1024 * 1024, 4 * 1024 * 1024);
1468
1469 assert_eq!(
1470 config.endpoint,
1471 Some("https://custom.endpoint.com".to_string())
1472 );
1473 assert_eq!(config.timeout, Duration::from_secs(60));
1474 assert_eq!(config.multipart_threshold, 50 * 1024 * 1024);
1475 assert_eq!(config.chunk_size, 4 * 1024 * 1024);
1476 }
1477
1478 #[cfg(feature = "async")]
1479 #[tokio::test]
1480 async fn test_s3_backend_operations() {
1481 let config =
1482 CloudConfig::new_bucket("test-bucket".to_string(), CloudCredentials::Anonymous);
1483 let backend = S3Backend::new(config).expect("Operation failed");
1484
1485 let metadata = backend
1487 .get_metadata("test-key")
1488 .await
1489 .expect("Operation failed");
1490 assert_eq!(metadata.key, "test-key");
1491 assert!(metadata.size > 0);
1492
1493 let exists = backend
1495 .object_exists("test-key")
1496 .await
1497 .expect("Operation failed");
1498 assert!(exists);
1499
1500 let data = b"test data";
1502 let result = backend
1503 .upload_data(data, "test-upload", TransferOptions::default())
1504 .await
1505 .expect("Operation failed");
1506 assert_eq!(result.key, "test-upload");
1507 assert_eq!(result.size, data.len() as u64);
1508
1509 let downloaded = backend
1511 .get_object("test-key")
1512 .await
1513 .expect("Operation failed");
1514 assert!(!downloaded.is_empty());
1515
1516 let list_result = backend
1518 .list_objects(None, Some("5"))
1519 .await
1520 .expect("Operation failed");
1521 assert!(!list_result.objects.is_empty());
1522 assert!(list_result.objects.len() <= 10);
1523
1524 let url = backend
1526 .generate_presigned_url("test-key", Duration::from_secs(3600), HttpMethod::Get)
1527 .await
1528 .expect("Operation failed");
1529 assert!(url.contains("test-key"));
1530 assert!(url.contains("expires=3600"));
1531 }
1532
1533 #[cfg(feature = "async")]
1534 #[tokio::test]
1535 async fn test_cloud_storage_client() {
1536 let config =
1537 CloudConfig::new_bucket("test-bucket".to_string(), CloudCredentials::Anonymous);
1538 let client = CloudStorageClient::new(config).expect("Operation failed");
1539
1540 let metadata1 = client
1542 .get_metadata("test-key")
1543 .await
1544 .expect("Operation failed");
1545 let metadata2 = client
1546 .get_metadata("test-key")
1547 .await
1548 .expect("Operation failed"); assert_eq!(metadata1.key, metadata2.key);
1550
1551 client.clear_cache();
1553
1554 let data = b"test data for client";
1556 let result = client
1557 .upload_data(data, "client-test", TransferOptions::default())
1558 .await
1559 .expect("Operation failed");
1560 assert_eq!(result.size, data.len() as u64);
1561 }
1562
1563 #[test]
1564 fn test_transfer_options() {
1565 let mut options = TransferOptions::default();
1566 options
1567 .metadata
1568 .insert("custom-key".to_string(), "custom-value".to_string());
1569 options.content_type = Some("text/plain".to_string());
1570 options.overwrite = true;
1571
1572 assert_eq!(
1573 options.metadata.get("custom-key"),
1574 Some(&"custom-value".to_string())
1575 );
1576 assert_eq!(options.content_type, Some("text/plain".to_string()));
1577 assert!(options.overwrite);
1578 }
1579
1580 #[test]
1581 fn test_encryption_config() {
1582 let encryption = EncryptionConfig {
1583 method: EncryptionMethod::CustomerManaged,
1584 key: Some("test-key-id".to_string()),
1585 };
1586
1587 match encryption.method {
1588 EncryptionMethod::CustomerManaged => assert!(true),
1589 _ => assert!(false),
1590 }
1591 assert_eq!(encryption.key, Some("test-key-id".to_string()));
1592 }
1593}