1use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::Utc;
6use reqwest::{Client, StatusCode};
7use std::collections::HashMap;
8use url::Url;
9
10use crate::error::{CloudError, Result};
11use crate::security::Credentials;
12use crate::types::{
13 CloudStorage, DeleteResult, ListResult, ObjectInfo, ObjectMetadata, StorageClass, StorageStats,
14 UploadOptions,
15};
16
17pub struct GenericStorage {
19 client: Client,
20 endpoint: Url,
21 credentials: Credentials,
22 bucket: String,
23}
24
25impl GenericStorage {
26 pub fn new(endpoint: Url, credentials: Credentials, bucket: String) -> Result<Self> {
32 credentials.validate()?;
33
34 Ok(Self {
35 client: Client::new(),
36 endpoint,
37 credentials,
38 bucket,
39 })
40 }
41
42 pub async fn create_bucket(&self) -> Result<()> {
48 let url = format!("{}/{}", self.endpoint, self.bucket);
49
50 let response = self
51 .client
52 .put(&url)
53 .header("Authorization", self.auth_header("PUT", &self.bucket, ""))
54 .send()
55 .await?;
56
57 if !response.status().is_success() {
58 return Err(CloudError::Storage("Failed to create bucket".to_string()));
59 }
60
61 Ok(())
62 }
63
64 pub async fn delete_bucket(&self) -> Result<()> {
70 let url = format!("{}/{}", self.endpoint, self.bucket);
71
72 let response = self
73 .client
74 .delete(&url)
75 .header(
76 "Authorization",
77 self.auth_header("DELETE", &self.bucket, ""),
78 )
79 .send()
80 .await?;
81
82 if !response.status().is_success() {
83 return Err(CloudError::Storage("Failed to delete bucket".to_string()));
84 }
85
86 Ok(())
87 }
88
89 fn auth_header(&self, _method: &str, _path: &str, _query: &str) -> String {
91 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
93
94 format!(
96 "AWS4-HMAC-SHA256 Credential={}/{}/us-east-1/s3/aws4_request",
97 self.credentials.access_key, date
98 )
99 }
100
101 fn object_url(&self, key: &str) -> String {
103 format!("{}/{}/{}", self.endpoint, self.bucket, key)
104 }
105}
106
107#[async_trait]
108impl CloudStorage for GenericStorage {
109 async fn upload(&self, key: &str, data: Bytes) -> Result<()> {
110 let url = self.object_url(key);
111
112 let response = self
113 .client
114 .put(&url)
115 .header(
116 "Authorization",
117 self.auth_header("PUT", &format!("/{key}"), ""),
118 )
119 .header("Content-Type", "application/octet-stream")
120 .body(data)
121 .send()
122 .await?;
123
124 if !response.status().is_success() {
125 return Err(CloudError::Storage("Failed to upload object".to_string()));
126 }
127
128 Ok(())
129 }
130
131 async fn upload_with_options(
132 &self,
133 key: &str,
134 data: Bytes,
135 options: UploadOptions,
136 ) -> Result<()> {
137 let url = self.object_url(key);
138
139 let mut request = self.client.put(&url).header(
140 "Authorization",
141 self.auth_header("PUT", &format!("/{key}"), ""),
142 );
143
144 if let Some(content_type) = options.content_type {
145 request = request.header("Content-Type", content_type);
146 } else {
147 request = request.header("Content-Type", "application/octet-stream");
148 }
149
150 if let Some(cache_control) = options.cache_control {
151 request = request.header("Cache-Control", cache_control);
152 }
153
154 if let Some(content_encoding) = options.content_encoding {
155 request = request.header("Content-Encoding", content_encoding);
156 }
157
158 for (key, value) in options.metadata {
160 request = request.header(format!("x-amz-meta-{key}"), value);
161 }
162
163 let response = request.body(data).send().await?;
164
165 if !response.status().is_success() {
166 return Err(CloudError::Storage("Failed to upload object".to_string()));
167 }
168
169 Ok(())
170 }
171
172 async fn download(&self, key: &str) -> Result<Bytes> {
173 let url = self.object_url(key);
174
175 let response = self
176 .client
177 .get(&url)
178 .header(
179 "Authorization",
180 self.auth_header("GET", &format!("/{key}"), ""),
181 )
182 .send()
183 .await?;
184
185 if !response.status().is_success() {
186 return Err(CloudError::Storage("Failed to download object".to_string()));
187 }
188
189 let data = response.bytes().await?;
190 Ok(data)
191 }
192
193 async fn download_range(&self, key: &str, start: u64, end: u64) -> Result<Bytes> {
194 let url = self.object_url(key);
195 let range_header = format!("bytes={start}-{end}");
196
197 let response = self
198 .client
199 .get(&url)
200 .header(
201 "Authorization",
202 self.auth_header("GET", &format!("/{key}"), ""),
203 )
204 .header("Range", range_header)
205 .send()
206 .await?;
207
208 if !response.status().is_success() && response.status() != StatusCode::PARTIAL_CONTENT {
209 return Err(CloudError::Storage(
210 "Failed to download object range".to_string(),
211 ));
212 }
213
214 let data = response.bytes().await?;
215 Ok(data)
216 }
217
218 async fn list(&self, prefix: &str) -> Result<Vec<ObjectInfo>> {
219 let url = format!(
220 "{}{}?list-type=2&prefix={}",
221 self.endpoint, self.bucket, prefix
222 );
223
224 let response = self
225 .client
226 .get(&url)
227 .header(
228 "Authorization",
229 self.auth_header("GET", "", &format!("list-type=2&prefix={prefix}")),
230 )
231 .send()
232 .await?;
233
234 if !response.status().is_success() {
235 return Err(CloudError::Storage("Failed to list objects".to_string()));
236 }
237
238 let text = response.text().await?;
239
240 let mut objects = Vec::new();
242
243 for line in text.lines() {
245 if line.contains("<Key>") {
246 if let Some(key) = extract_xml_value(line, "Key") {
247 objects.push(ObjectInfo {
249 key,
250 size: 0,
251 last_modified: Utc::now(),
252 etag: None,
253 storage_class: None,
254 content_type: None,
255 });
256 }
257 }
258 }
259
260 Ok(objects)
261 }
262
263 async fn list_paginated(
264 &self,
265 prefix: &str,
266 continuation_token: Option<String>,
267 max_keys: usize,
268 ) -> Result<ListResult> {
269 let mut url = format!(
270 "{}{}?list-type=2&prefix={}&max-keys={}",
271 self.endpoint, self.bucket, prefix, max_keys
272 );
273
274 if let Some(token) = continuation_token {
275 url.push_str(&format!("&continuation-token={token}"));
276 }
277
278 let response = self
279 .client
280 .get(&url)
281 .header("Authorization", self.auth_header("GET", "", ""))
282 .send()
283 .await?;
284
285 if !response.status().is_success() {
286 return Err(CloudError::Storage("Failed to list objects".to_string()));
287 }
288
289 let text = response.text().await?;
290 let mut objects = Vec::new();
291
292 for line in text.lines() {
294 if line.contains("<Key>") {
295 if let Some(key) = extract_xml_value(line, "Key") {
296 objects.push(ObjectInfo {
297 key,
298 size: 0,
299 last_modified: Utc::now(),
300 etag: None,
301 storage_class: None,
302 content_type: None,
303 });
304 }
305 }
306 }
307
308 let is_truncated = text.contains("<IsTruncated>true</IsTruncated>");
309
310 Ok(ListResult {
311 objects,
312 continuation_token: None,
313 is_truncated,
314 common_prefixes: Vec::new(),
315 })
316 }
317
318 async fn delete(&self, key: &str) -> Result<()> {
319 let url = self.object_url(key);
320
321 let response = self
322 .client
323 .delete(&url)
324 .header(
325 "Authorization",
326 self.auth_header("DELETE", &format!("/{key}"), ""),
327 )
328 .send()
329 .await?;
330
331 if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
332 return Err(CloudError::Storage("Failed to delete object".to_string()));
333 }
334
335 Ok(())
336 }
337
338 async fn delete_batch(&self, keys: &[String]) -> Result<Vec<DeleteResult>> {
339 let mut results = Vec::new();
340
341 for key in keys {
342 match self.delete(key).await {
343 Ok(()) => results.push(DeleteResult {
344 key: key.clone(),
345 success: true,
346 error: None,
347 }),
348 Err(e) => results.push(DeleteResult {
349 key: key.clone(),
350 success: false,
351 error: Some(e.to_string()),
352 }),
353 }
354 }
355
356 Ok(results)
357 }
358
359 async fn get_metadata(&self, key: &str) -> Result<ObjectMetadata> {
360 let url = self.object_url(key);
361
362 let response = self
363 .client
364 .head(&url)
365 .header(
366 "Authorization",
367 self.auth_header("HEAD", &format!("/{key}"), ""),
368 )
369 .send()
370 .await?;
371
372 if !response.status().is_success() {
373 return Err(CloudError::Storage(
374 "Failed to get object metadata".to_string(),
375 ));
376 }
377
378 let headers = response.headers();
379 let size = headers
380 .get("Content-Length")
381 .and_then(|v| v.to_str().ok())
382 .and_then(|s| s.parse().ok())
383 .unwrap_or(0);
384
385 let content_type = headers
386 .get("Content-Type")
387 .and_then(|v| v.to_str().ok())
388 .map(ToString::to_string);
389
390 let info = ObjectInfo {
391 key: key.to_string(),
392 size,
393 last_modified: Utc::now(),
394 etag: headers
395 .get("ETag")
396 .and_then(|v| v.to_str().ok())
397 .map(ToString::to_string),
398 storage_class: None,
399 content_type,
400 };
401
402 let mut user_metadata = HashMap::new();
403 for (key, value) in headers {
404 if let Some(meta_key) = key.as_str().strip_prefix("x-amz-meta-") {
405 if let Ok(value_str) = value.to_str() {
406 user_metadata.insert(meta_key.to_string(), value_str.to_string());
407 }
408 }
409 }
410
411 Ok(ObjectMetadata {
412 info,
413 user_metadata,
414 system_metadata: HashMap::new(),
415 tags: HashMap::new(),
416 content_encoding: headers
417 .get("Content-Encoding")
418 .and_then(|v| v.to_str().ok())
419 .map(ToString::to_string),
420 content_language: None,
421 cache_control: headers
422 .get("Cache-Control")
423 .and_then(|v| v.to_str().ok())
424 .map(ToString::to_string),
425 content_disposition: None,
426 })
427 }
428
429 async fn update_metadata(&self, key: &str, _metadata: HashMap<String, String>) -> Result<()> {
430 self.copy(key, key).await
432 }
433
434 async fn exists(&self, key: &str) -> Result<bool> {
435 let url = self.object_url(key);
436
437 let response = self
438 .client
439 .head(&url)
440 .header(
441 "Authorization",
442 self.auth_header("HEAD", &format!("/{key}"), ""),
443 )
444 .send()
445 .await?;
446
447 Ok(response.status().is_success())
448 }
449
450 async fn copy(&self, source_key: &str, dest_key: &str) -> Result<()> {
451 let url = self.object_url(dest_key);
452 let copy_source = format!("/{}/{}", self.bucket, source_key);
453
454 let response = self
455 .client
456 .put(&url)
457 .header(
458 "Authorization",
459 self.auth_header("PUT", &format!("/{dest_key}"), ""),
460 )
461 .header("x-amz-copy-source", copy_source)
462 .send()
463 .await?;
464
465 if !response.status().is_success() {
466 return Err(CloudError::Storage("Failed to copy object".to_string()));
467 }
468
469 Ok(())
470 }
471
472 async fn presigned_download_url(&self, key: &str, expires_in_secs: u64) -> Result<String> {
473 let url = self.object_url(key);
475 Ok(format!("{url}?expires={expires_in_secs}"))
476 }
477
478 async fn presigned_upload_url(&self, key: &str, expires_in_secs: u64) -> Result<String> {
479 let url = self.object_url(key);
480 Ok(format!("{url}?expires={expires_in_secs}"))
481 }
482
483 async fn set_storage_class(&self, _key: &str, _class: StorageClass) -> Result<()> {
484 Ok(())
486 }
487
488 async fn get_stats(&self, prefix: &str) -> Result<StorageStats> {
489 let objects = self.list(prefix).await?;
490
491 let mut stats = StorageStats::default();
492 for obj in objects {
493 stats.total_size += obj.size;
494 stats.object_count += 1;
495 }
496
497 Ok(stats)
498 }
499}
500
501fn extract_xml_value(line: &str, tag: &str) -> Option<String> {
503 let start_tag = format!("<{tag}>");
504 let end_tag = format!("</{tag}>");
505
506 if let Some(start) = line.find(&start_tag) {
507 if let Some(end) = line.find(&end_tag) {
508 let value_start = start + start_tag.len();
509 if value_start < end {
510 return Some(line[value_start..end].to_string());
511 }
512 }
513 }
514 None
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520
521 #[test]
522 fn test_extract_xml_value() {
523 let line = "<Key>test-object.txt</Key>";
524 let value = extract_xml_value(line, "Key");
525 assert_eq!(value, Some("test-object.txt".to_string()));
526
527 let line2 = "<Size>1024</Size>";
528 let value2 = extract_xml_value(line2, "Size");
529 assert_eq!(value2, Some("1024".to_string()));
530 }
531
532 #[test]
533 fn test_extract_xml_value_no_match() {
534 let line = "<Name>bucket</Name>";
535 let value = extract_xml_value(line, "Key");
536 assert_eq!(value, None);
537 }
538
539 #[test]
540 fn test_generic_storage_creation() {
541 let endpoint = Url::parse("https://s3.example.com").expect("endpoint should be valid");
542 let credentials = Credentials::new("access".to_string(), "secret".to_string());
543 let storage = GenericStorage::new(endpoint, credentials, "test-bucket".to_string());
544 assert!(storage.is_ok());
545 }
546}