1use std::sync::Arc;
2use std::time::Duration;
3
4use bytes::Bytes;
5
6use crate::error::{Error, Result};
7
8use super::backend::BackendKind;
9use super::client::RemoteBackend;
10use super::config::BucketConfig;
11use super::fetch::fetch_url;
12use super::options::PutOptions;
13use super::path::{generate_key, validate_path};
14
15#[cfg(any(test, feature = "test-helpers"))]
16use super::memory::MemoryBackend;
17
18#[non_exhaustive]
23pub struct PutInput {
24 pub data: Bytes,
26 pub prefix: String,
28 pub filename: Option<String>,
30 pub content_type: String,
32}
33
34impl PutInput {
35 pub fn new(
37 data: impl Into<bytes::Bytes>,
38 prefix: impl Into<String>,
39 content_type: impl Into<String>,
40 ) -> Self {
41 Self {
42 data: data.into(),
43 prefix: prefix.into(),
44 filename: None,
45 content_type: content_type.into(),
46 }
47 }
48
49 fn extension(&self) -> Option<String> {
51 let name = self.filename.as_deref()?;
52 if name.is_empty() {
53 return None;
54 }
55 let ext = name.rsplit('.').next()?;
56 if ext == name {
57 None
58 } else {
59 Some(ext.to_ascii_lowercase())
60 }
61 }
62}
63
64#[non_exhaustive]
66pub struct PutFromUrlInput {
67 pub url: String,
69 pub prefix: String,
71 pub filename: Option<String>,
73}
74
75impl PutFromUrlInput {
76 pub fn new(url: impl Into<String>, prefix: impl Into<String>) -> Self {
78 Self {
79 url: url.into(),
80 prefix: prefix.into(),
81 filename: None,
82 }
83 }
84}
85
86pub(crate) struct StorageInner {
87 pub(crate) backend: BackendKind,
88 pub(crate) public_url: Option<String>,
89 pub(crate) max_file_size: Option<usize>,
90 pub(crate) fetch_client: Option<reqwest::Client>,
91}
92
93pub struct Storage {
99 pub(crate) inner: Arc<StorageInner>,
100}
101
102impl Clone for Storage {
103 fn clone(&self) -> Self {
104 Self {
105 inner: Arc::clone(&self.inner),
106 }
107 }
108}
109
110impl Storage {
111 pub fn with_client(config: &BucketConfig, client: reqwest::Client) -> Result<Self> {
122 config.validate()?;
123
124 let region = config
125 .region
126 .clone()
127 .unwrap_or_else(|| "us-east-1".to_string());
128 let backend = RemoteBackend::new(
129 client,
130 config.bucket.clone(),
131 config.endpoint.clone(),
132 config.access_key.clone(),
133 config.secret_key.clone(),
134 region,
135 config.path_style,
136 )?;
137
138 let fetch_client = reqwest::Client::builder()
139 .redirect(reqwest::redirect::Policy::none())
140 .build()
141 .map_err(|e| Error::internal(format!("failed to build fetch HTTP client: {e}")))?;
142
143 Ok(Self {
144 inner: Arc::new(StorageInner {
145 backend: BackendKind::Remote(Box::new(backend)),
146 public_url: config.normalized_public_url(),
147 max_file_size: config.max_file_size_bytes()?,
148 fetch_client: Some(fetch_client),
149 }),
150 })
151 }
152
153 pub fn new(config: &BucketConfig) -> Result<Self> {
162 Self::with_client(config, reqwest::Client::new())
163 }
164
165 #[cfg(any(test, feature = "test-helpers"))]
170 pub fn memory() -> Self {
171 Self {
172 inner: Arc::new(StorageInner {
173 backend: BackendKind::Memory(MemoryBackend::new()),
174 public_url: Some("https://test.example.com".to_string()),
175 max_file_size: None,
176 fetch_client: None,
177 }),
178 }
179 }
180
181 pub async fn put(&self, input: &PutInput) -> Result<String> {
190 self.put_inner(input, &PutOptions::default()).await
191 }
192
193 pub async fn put_with(&self, input: &PutInput, opts: PutOptions) -> Result<String> {
199 self.put_inner(input, &opts).await
200 }
201
202 async fn put_inner(&self, input: &PutInput, opts: &PutOptions) -> Result<String> {
203 validate_path(&input.prefix)?;
204
205 if let Some(max) = self.inner.max_file_size
206 && input.data.len() > max
207 {
208 return Err(Error::payload_too_large(format!(
209 "file size {} exceeds maximum {}",
210 input.data.len(),
211 max
212 )));
213 }
214
215 let ext = input.extension();
216 let key = generate_key(&input.prefix, ext.as_deref());
217
218 let content_type = opts.content_type.as_deref().unwrap_or(&input.content_type);
219
220 let result = match &self.inner.backend {
221 BackendKind::Remote(b) => b.put(&key, input.data.clone(), content_type, opts).await,
222 BackendKind::Memory(b) => b.put(&key, input.data.clone(), content_type, opts).await,
223 };
224
225 if let Err(e) = result {
226 let delete_result = match &self.inner.backend {
227 BackendKind::Remote(b) => b.delete(&key).await,
228 BackendKind::Memory(b) => b.delete(&key).await,
229 };
230 if let Err(del_err) = delete_result {
231 tracing::warn!(key = %key, error = %del_err, "failed to clean up partial upload");
232 }
233 return Err(e);
234 }
235
236 tracing::info!(key = %key, size = input.data.len(), "file uploaded");
237 Ok(key)
238 }
239
240 pub async fn delete(&self, key: &str) -> Result<()> {
246 validate_path(key)?;
247 match &self.inner.backend {
248 BackendKind::Remote(b) => b.delete(key).await,
249 BackendKind::Memory(b) => b.delete(key).await,
250 }
251 .map_err(|e| Error::internal(format!("failed to delete file: {e}")))?;
252 tracing::info!(key = %key, "file deleted");
253 Ok(())
254 }
255
256 pub async fn delete_prefix(&self, prefix: &str) -> Result<()> {
263 validate_path(prefix)?;
264 let keys = match &self.inner.backend {
265 BackendKind::Remote(b) => b.list(prefix).await,
266 BackendKind::Memory(b) => b.list(prefix).await,
267 }
268 .map_err(|e| Error::internal(format!("failed to list prefix: {e}")))?;
269
270 for key in &keys {
271 match &self.inner.backend {
272 BackendKind::Remote(b) => b.delete(key).await,
273 BackendKind::Memory(b) => b.delete(key).await,
274 }
275 .map_err(|e| Error::internal(format!("failed to delete {key}: {e}")))?;
276 }
277
278 tracing::info!(prefix = %prefix, count = keys.len(), "prefix deleted");
279 Ok(())
280 }
281
282 pub fn url(&self, key: &str) -> Result<String> {
292 validate_path(key)?;
293 let base = self
294 .inner
295 .public_url
296 .as_ref()
297 .ok_or_else(|| Error::internal("public_url not configured"))?;
298 Ok(format!("{base}/{key}"))
299 }
300
301 pub async fn presigned_url(&self, key: &str, expires_in: Duration) -> Result<String> {
307 validate_path(key)?;
308 match &self.inner.backend {
309 BackendKind::Remote(b) => b.presigned_url(key, expires_in).await,
310 BackendKind::Memory(b) => b.presigned_url(key, expires_in).await,
311 }
312 .map_err(|e| Error::internal(format!("failed to generate presigned URL: {e}")))
313 }
314
315 pub async fn exists(&self, key: &str) -> Result<bool> {
321 validate_path(key)?;
322 match &self.inner.backend {
323 BackendKind::Remote(b) => b.exists(key).await,
324 BackendKind::Memory(b) => b.exists(key).await,
325 }
326 .map_err(|e| Error::internal(format!("failed to check existence: {e}")))
327 }
328
329 pub async fn put_from_url(&self, input: &PutFromUrlInput) -> Result<String> {
341 self.put_from_url_inner(input, &PutOptions::default()).await
342 }
343
344 pub async fn put_from_url_with(
353 &self,
354 input: &PutFromUrlInput,
355 opts: PutOptions,
356 ) -> Result<String> {
357 self.put_from_url_inner(input, &opts).await
358 }
359
360 async fn put_from_url_inner(
361 &self,
362 input: &PutFromUrlInput,
363 opts: &PutOptions,
364 ) -> Result<String> {
365 let client = self
366 .inner
367 .fetch_client
368 .as_ref()
369 .ok_or_else(|| Error::internal("URL fetch not supported in memory backend"))?;
370 let fetched = fetch_url(client, &input.url, self.inner.max_file_size).await?;
371
372 let put_input = PutInput {
373 data: fetched.data,
374 prefix: input.prefix.clone(),
375 filename: input.filename.clone(),
376 content_type: fetched.content_type,
377 };
378
379 self.put_inner(&put_input, opts).await
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 use bytes::Bytes;
387
388 #[tokio::test]
389 async fn put_returns_key_with_prefix_and_extension() {
390 let storage = Storage::memory();
391 let input = PutInput {
392 data: Bytes::from("imgdata"),
393 prefix: "avatars/".into(),
394 filename: Some("photo.jpg".into()),
395 content_type: "image/jpeg".into(),
396 };
397 let key = storage.put(&input).await.unwrap();
398 assert!(key.starts_with("avatars/"));
399 assert!(key.ends_with(".jpg"));
400 }
401
402 #[tokio::test]
403 async fn put_no_extension_without_filename() {
404 let storage = Storage::memory();
405 let input = PutInput {
406 data: Bytes::from("data"),
407 prefix: "raw/".into(),
408 filename: None,
409 content_type: "application/octet-stream".into(),
410 };
411 let key = storage.put(&input).await.unwrap();
412 assert!(key.starts_with("raw/"));
413 assert!(!key.contains('.'));
414 }
415
416 #[tokio::test]
417 async fn put_no_extension_with_empty_filename() {
418 let storage = Storage::memory();
419 let input = PutInput {
420 data: Bytes::from("data"),
421 prefix: "raw/".into(),
422 filename: Some("".into()),
423 content_type: "application/octet-stream".into(),
424 };
425 let key = storage.put(&input).await.unwrap();
426 assert!(!key.contains('.'));
427 }
428
429 #[tokio::test]
430 async fn put_file_exists_after_upload() {
431 let storage = Storage::memory();
432 let input = PutInput {
433 data: Bytes::from("pdf content"),
434 prefix: "docs/".into(),
435 filename: Some("doc.pdf".into()),
436 content_type: "application/pdf".into(),
437 };
438 let key = storage.put(&input).await.unwrap();
439 assert!(storage.exists(&key).await.unwrap());
440 }
441
442 #[tokio::test]
443 async fn put_respects_max_file_size() {
444 let storage = Storage {
445 inner: Arc::new(StorageInner {
446 backend: BackendKind::Memory(MemoryBackend::new()),
447 public_url: None,
448 max_file_size: Some(5),
449 fetch_client: None,
450 }),
451 };
452 let input = PutInput {
453 data: Bytes::from(vec![0u8; 10]),
454 prefix: "uploads/".into(),
455 filename: Some("big.bin".into()),
456 content_type: "application/octet-stream".into(),
457 };
458 let err = storage.put(&input).await.err().unwrap();
459 assert_eq!(err.status(), http::StatusCode::PAYLOAD_TOO_LARGE);
460 }
461
462 #[tokio::test]
463 async fn put_with_options() {
464 let storage = Storage::memory();
465 let input = PutInput {
466 data: Bytes::from("pdf"),
467 prefix: "reports/".into(),
468 filename: Some("report.pdf".into()),
469 content_type: "application/pdf".into(),
470 };
471 let key = storage
472 .put_with(
473 &input,
474 PutOptions {
475 content_disposition: Some("attachment".into()),
476 cache_control: Some("max-age=3600".into()),
477 ..Default::default()
478 },
479 )
480 .await
481 .unwrap();
482 assert!(storage.exists(&key).await.unwrap());
483 }
484
485 #[tokio::test]
486 async fn delete_removes_file() {
487 let storage = Storage::memory();
488 let input = PutInput {
489 data: Bytes::from("hello"),
490 prefix: "tmp/".into(),
491 filename: Some("a.txt".into()),
492 content_type: "text/plain".into(),
493 };
494 let key = storage.put(&input).await.unwrap();
495 storage.delete(&key).await.unwrap();
496 assert!(!storage.exists(&key).await.unwrap());
497 }
498
499 #[tokio::test]
500 async fn delete_nonexistent_is_noop() {
501 let storage = Storage::memory();
502 storage.delete("nonexistent/file.txt").await.unwrap();
503 }
504
505 #[tokio::test]
506 async fn delete_prefix_removes_all() {
507 let storage = Storage::memory();
508 let f1 = PutInput {
509 data: Bytes::from("a"),
510 prefix: "prefix/".into(),
511 filename: Some("a.txt".into()),
512 content_type: "text/plain".into(),
513 };
514 let f2 = PutInput {
515 data: Bytes::from("b"),
516 prefix: "prefix/".into(),
517 filename: Some("b.txt".into()),
518 content_type: "text/plain".into(),
519 };
520 let k1 = storage.put(&f1).await.unwrap();
521 let k2 = storage.put(&f2).await.unwrap();
522
523 storage.delete_prefix("prefix/").await.unwrap();
524
525 assert!(!storage.exists(&k1).await.unwrap());
526 assert!(!storage.exists(&k2).await.unwrap());
527 }
528
529 #[tokio::test]
530 async fn url_returns_public_url() {
531 let storage = Storage::memory();
532 let url = storage.url("avatars/photo.jpg").unwrap();
533 assert_eq!(url, "https://test.example.com/avatars/photo.jpg");
534 }
535
536 #[tokio::test]
537 async fn url_errors_without_public_url() {
538 let storage = Storage {
539 inner: Arc::new(StorageInner {
540 backend: BackendKind::Memory(MemoryBackend::new()),
541 public_url: None,
542 max_file_size: None,
543 fetch_client: None,
544 }),
545 };
546 assert!(storage.url("key.jpg").is_err());
547 }
548
549 #[tokio::test]
550 async fn presigned_url_works_on_memory() {
551 let storage = Storage::memory();
552 let url = storage
553 .presigned_url("key.jpg", std::time::Duration::from_secs(3600))
554 .await
555 .unwrap();
556 assert!(url.contains("key.jpg"));
557 assert!(url.contains("expires=3600"));
558 }
559
560 #[tokio::test]
561 async fn exists_false_for_missing() {
562 let storage = Storage::memory();
563 assert!(!storage.exists("nonexistent.jpg").await.unwrap());
564 }
565
566 #[tokio::test]
567 async fn put_rejects_path_traversal() {
568 let storage = Storage::memory();
569 let input = PutInput {
570 data: Bytes::from("data"),
571 prefix: "../etc/".into(),
572 filename: Some("f.txt".into()),
573 content_type: "text/plain".into(),
574 };
575 assert!(storage.put(&input).await.is_err());
576 }
577
578 #[tokio::test]
579 async fn put_rejects_absolute_path() {
580 let storage = Storage::memory();
581 let input = PutInput {
582 data: Bytes::from("data"),
583 prefix: "/root/".into(),
584 filename: Some("f.txt".into()),
585 content_type: "text/plain".into(),
586 };
587 assert!(storage.put(&input).await.is_err());
588 }
589
590 #[tokio::test]
591 async fn put_rejects_empty_prefix() {
592 let storage = Storage::memory();
593 let input = PutInput {
594 data: Bytes::from("data"),
595 prefix: "".into(),
596 filename: Some("f.txt".into()),
597 content_type: "text/plain".into(),
598 };
599 assert!(storage.put(&input).await.is_err());
600 }
601
602 #[tokio::test]
603 async fn put_from_url_memory_backend_returns_error() {
604 let storage = Storage::memory();
605 let input = PutFromUrlInput {
606 url: "https://example.com/file.jpg".into(),
607 prefix: "downloads/".into(),
608 filename: Some("file.jpg".into()),
609 };
610 let err = storage.put_from_url(&input).await.err().unwrap();
611 assert_eq!(err.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
612 }
613}