1use std::sync::Arc;
2use std::time::Duration;
3
4use bytes::Bytes;
5
6use crate::error::{Error, Result};
7
8use super::backend::BackendKind;
9use super::client::RemoteBackend;
10use super::config::BucketConfig;
11use super::fetch::fetch_url;
12use super::options::PutOptions;
13use super::path::{generate_key, validate_path};
14
15#[cfg(any(test, feature = "test-helpers"))]
16use super::memory::MemoryBackend;
17
18#[non_exhaustive]
23pub struct PutInput {
24 pub data: Bytes,
26 pub prefix: String,
28 pub filename: Option<String>,
30 pub content_type: String,
32}
33
34impl PutInput {
35 pub fn new(
37 data: impl Into<bytes::Bytes>,
38 prefix: impl Into<String>,
39 content_type: impl Into<String>,
40 ) -> Self {
41 Self {
42 data: data.into(),
43 prefix: prefix.into(),
44 filename: None,
45 content_type: content_type.into(),
46 }
47 }
48
49 fn extension(&self) -> Option<String> {
51 let name = self.filename.as_deref()?;
52 if name.is_empty() {
53 return None;
54 }
55 let ext = name.rsplit('.').next()?;
56 if ext == name {
57 None
58 } else {
59 Some(ext.to_ascii_lowercase())
60 }
61 }
62}
63
64#[non_exhaustive]
66pub struct PutFromUrlInput {
67 pub url: String,
69 pub prefix: String,
71 pub filename: Option<String>,
73}
74
75impl PutFromUrlInput {
76 pub fn new(url: impl Into<String>, prefix: impl Into<String>) -> Self {
78 Self {
79 url: url.into(),
80 prefix: prefix.into(),
81 filename: None,
82 }
83 }
84}
85
86pub(crate) struct StorageInner {
87 pub(crate) backend: BackendKind,
88 pub(crate) public_url: Option<String>,
89 pub(crate) max_file_size: Option<usize>,
90 pub(crate) fetch_client: Option<reqwest::Client>,
91}
92
93pub struct Storage {
111 pub(crate) inner: Arc<StorageInner>,
112}
113
114impl Clone for Storage {
115 fn clone(&self) -> Self {
116 Self {
117 inner: Arc::clone(&self.inner),
118 }
119 }
120}
121
122impl Storage {
123 pub fn with_client(config: &BucketConfig, client: reqwest::Client) -> Result<Self> {
134 config.validate()?;
135
136 let region = config
137 .region
138 .clone()
139 .unwrap_or_else(|| "us-east-1".to_string());
140 let backend = RemoteBackend::new(
141 client,
142 config.bucket.clone(),
143 config.endpoint.clone(),
144 config.access_key.clone(),
145 config.secret_key.clone(),
146 region,
147 config.path_style,
148 )?;
149
150 let fetch_client = reqwest::Client::builder()
151 .redirect(reqwest::redirect::Policy::none())
152 .build()
153 .map_err(|e| Error::internal(format!("failed to build fetch HTTP client: {e}")))?;
154
155 Ok(Self {
156 inner: Arc::new(StorageInner {
157 backend: BackendKind::Remote(Box::new(backend)),
158 public_url: config.normalized_public_url(),
159 max_file_size: config.max_file_size_bytes()?,
160 fetch_client: Some(fetch_client),
161 }),
162 })
163 }
164
165 pub fn new(config: &BucketConfig) -> Result<Self> {
174 Self::with_client(config, reqwest::Client::new())
175 }
176
177 #[cfg(any(test, feature = "test-helpers"))]
182 pub fn memory() -> Self {
183 Self {
184 inner: Arc::new(StorageInner {
185 backend: BackendKind::Memory(MemoryBackend::new()),
186 public_url: Some("https://test.example.com".to_string()),
187 max_file_size: None,
188 fetch_client: None,
189 }),
190 }
191 }
192
193 pub async fn put(&self, input: &PutInput) -> Result<String> {
202 self.put_inner(input, &PutOptions::default()).await
203 }
204
205 pub async fn put_with(&self, input: &PutInput, opts: PutOptions) -> Result<String> {
211 self.put_inner(input, &opts).await
212 }
213
214 async fn put_inner(&self, input: &PutInput, opts: &PutOptions) -> Result<String> {
215 validate_path(&input.prefix)?;
216
217 if let Some(max) = self.inner.max_file_size
218 && input.data.len() > max
219 {
220 return Err(Error::payload_too_large(format!(
221 "file size {} exceeds maximum {}",
222 input.data.len(),
223 max
224 )));
225 }
226
227 let ext = input.extension();
228 let key = generate_key(&input.prefix, ext.as_deref());
229
230 let content_type = opts.content_type.as_deref().unwrap_or(&input.content_type);
231
232 let result = match &self.inner.backend {
233 BackendKind::Remote(b) => b.put(&key, input.data.clone(), content_type, opts).await,
234 BackendKind::Memory(b) => b.put(&key, input.data.clone(), content_type, opts).await,
235 };
236
237 if let Err(e) = result {
238 let delete_result = match &self.inner.backend {
239 BackendKind::Remote(b) => b.delete(&key).await,
240 BackendKind::Memory(b) => b.delete(&key).await,
241 };
242 if let Err(del_err) = delete_result {
243 tracing::warn!(key = %key, error = %del_err, "failed to clean up partial upload");
244 }
245 return Err(e);
246 }
247
248 tracing::info!(key = %key, size = input.data.len(), "file uploaded");
249 Ok(key)
250 }
251
252 pub async fn delete(&self, key: &str) -> Result<()> {
258 validate_path(key)?;
259 match &self.inner.backend {
260 BackendKind::Remote(b) => b.delete(key).await,
261 BackendKind::Memory(b) => b.delete(key).await,
262 }
263 .map_err(|e| Error::internal(format!("failed to delete file: {e}")))?;
264 tracing::info!(key = %key, "file deleted");
265 Ok(())
266 }
267
268 pub async fn delete_prefix(&self, prefix: &str) -> Result<()> {
275 validate_path(prefix)?;
276 let keys = match &self.inner.backend {
277 BackendKind::Remote(b) => b.list(prefix).await,
278 BackendKind::Memory(b) => b.list(prefix).await,
279 }
280 .map_err(|e| Error::internal(format!("failed to list prefix: {e}")))?;
281
282 for key in &keys {
283 match &self.inner.backend {
284 BackendKind::Remote(b) => b.delete(key).await,
285 BackendKind::Memory(b) => b.delete(key).await,
286 }
287 .map_err(|e| Error::internal(format!("failed to delete {key}: {e}")))?;
288 }
289
290 tracing::info!(prefix = %prefix, count = keys.len(), "prefix deleted");
291 Ok(())
292 }
293
294 pub fn url(&self, key: &str) -> Result<String> {
304 validate_path(key)?;
305 let base = self
306 .inner
307 .public_url
308 .as_ref()
309 .ok_or_else(|| Error::internal("public_url not configured"))?;
310 Ok(format!("{base}/{key}"))
311 }
312
313 pub async fn presigned_url(&self, key: &str, expires_in: Duration) -> Result<String> {
319 validate_path(key)?;
320 match &self.inner.backend {
321 BackendKind::Remote(b) => b.presigned_url(key, expires_in).await,
322 BackendKind::Memory(b) => b.presigned_url(key, expires_in).await,
323 }
324 .map_err(|e| Error::internal(format!("failed to generate presigned URL: {e}")))
325 }
326
327 pub async fn exists(&self, key: &str) -> Result<bool> {
333 validate_path(key)?;
334 match &self.inner.backend {
335 BackendKind::Remote(b) => b.exists(key).await,
336 BackendKind::Memory(b) => b.exists(key).await,
337 }
338 .map_err(|e| Error::internal(format!("failed to check existence: {e}")))
339 }
340
341 pub async fn put_from_url(&self, input: &PutFromUrlInput) -> Result<String> {
353 self.put_from_url_inner(input, &PutOptions::default()).await
354 }
355
356 pub async fn put_from_url_with(
365 &self,
366 input: &PutFromUrlInput,
367 opts: PutOptions,
368 ) -> Result<String> {
369 self.put_from_url_inner(input, &opts).await
370 }
371
372 async fn put_from_url_inner(
373 &self,
374 input: &PutFromUrlInput,
375 opts: &PutOptions,
376 ) -> Result<String> {
377 let client = self
378 .inner
379 .fetch_client
380 .as_ref()
381 .ok_or_else(|| Error::internal("URL fetch not supported in memory backend"))?;
382 let fetched = fetch_url(client, &input.url, self.inner.max_file_size).await?;
383
384 let put_input = PutInput {
385 data: fetched.data,
386 prefix: input.prefix.clone(),
387 filename: input.filename.clone(),
388 content_type: fetched.content_type,
389 };
390
391 self.put_inner(&put_input, opts).await
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398 use bytes::Bytes;
399
400 #[tokio::test]
401 async fn put_returns_key_with_prefix_and_extension() {
402 let storage = Storage::memory();
403 let input = PutInput {
404 data: Bytes::from("imgdata"),
405 prefix: "avatars/".into(),
406 filename: Some("photo.jpg".into()),
407 content_type: "image/jpeg".into(),
408 };
409 let key = storage.put(&input).await.unwrap();
410 assert!(key.starts_with("avatars/"));
411 assert!(key.ends_with(".jpg"));
412 }
413
414 #[tokio::test]
415 async fn put_no_extension_without_filename() {
416 let storage = Storage::memory();
417 let input = PutInput {
418 data: Bytes::from("data"),
419 prefix: "raw/".into(),
420 filename: None,
421 content_type: "application/octet-stream".into(),
422 };
423 let key = storage.put(&input).await.unwrap();
424 assert!(key.starts_with("raw/"));
425 assert!(!key.contains('.'));
426 }
427
428 #[tokio::test]
429 async fn put_no_extension_with_empty_filename() {
430 let storage = Storage::memory();
431 let input = PutInput {
432 data: Bytes::from("data"),
433 prefix: "raw/".into(),
434 filename: Some("".into()),
435 content_type: "application/octet-stream".into(),
436 };
437 let key = storage.put(&input).await.unwrap();
438 assert!(!key.contains('.'));
439 }
440
441 #[tokio::test]
442 async fn put_file_exists_after_upload() {
443 let storage = Storage::memory();
444 let input = PutInput {
445 data: Bytes::from("pdf content"),
446 prefix: "docs/".into(),
447 filename: Some("doc.pdf".into()),
448 content_type: "application/pdf".into(),
449 };
450 let key = storage.put(&input).await.unwrap();
451 assert!(storage.exists(&key).await.unwrap());
452 }
453
454 #[tokio::test]
455 async fn put_respects_max_file_size() {
456 let storage = Storage {
457 inner: Arc::new(StorageInner {
458 backend: BackendKind::Memory(MemoryBackend::new()),
459 public_url: None,
460 max_file_size: Some(5),
461 fetch_client: None,
462 }),
463 };
464 let input = PutInput {
465 data: Bytes::from(vec![0u8; 10]),
466 prefix: "uploads/".into(),
467 filename: Some("big.bin".into()),
468 content_type: "application/octet-stream".into(),
469 };
470 let err = storage.put(&input).await.err().unwrap();
471 assert_eq!(err.status(), http::StatusCode::PAYLOAD_TOO_LARGE);
472 }
473
474 #[tokio::test]
475 async fn put_with_options() {
476 let storage = Storage::memory();
477 let input = PutInput {
478 data: Bytes::from("pdf"),
479 prefix: "reports/".into(),
480 filename: Some("report.pdf".into()),
481 content_type: "application/pdf".into(),
482 };
483 let key = storage
484 .put_with(
485 &input,
486 PutOptions {
487 content_disposition: Some("attachment".into()),
488 cache_control: Some("max-age=3600".into()),
489 ..Default::default()
490 },
491 )
492 .await
493 .unwrap();
494 assert!(storage.exists(&key).await.unwrap());
495 }
496
497 #[tokio::test]
498 async fn delete_removes_file() {
499 let storage = Storage::memory();
500 let input = PutInput {
501 data: Bytes::from("hello"),
502 prefix: "tmp/".into(),
503 filename: Some("a.txt".into()),
504 content_type: "text/plain".into(),
505 };
506 let key = storage.put(&input).await.unwrap();
507 storage.delete(&key).await.unwrap();
508 assert!(!storage.exists(&key).await.unwrap());
509 }
510
511 #[tokio::test]
512 async fn delete_nonexistent_is_noop() {
513 let storage = Storage::memory();
514 storage.delete("nonexistent/file.txt").await.unwrap();
515 }
516
517 #[tokio::test]
518 async fn delete_prefix_removes_all() {
519 let storage = Storage::memory();
520 let f1 = PutInput {
521 data: Bytes::from("a"),
522 prefix: "prefix/".into(),
523 filename: Some("a.txt".into()),
524 content_type: "text/plain".into(),
525 };
526 let f2 = PutInput {
527 data: Bytes::from("b"),
528 prefix: "prefix/".into(),
529 filename: Some("b.txt".into()),
530 content_type: "text/plain".into(),
531 };
532 let k1 = storage.put(&f1).await.unwrap();
533 let k2 = storage.put(&f2).await.unwrap();
534
535 storage.delete_prefix("prefix/").await.unwrap();
536
537 assert!(!storage.exists(&k1).await.unwrap());
538 assert!(!storage.exists(&k2).await.unwrap());
539 }
540
541 #[tokio::test]
542 async fn url_returns_public_url() {
543 let storage = Storage::memory();
544 let url = storage.url("avatars/photo.jpg").unwrap();
545 assert_eq!(url, "https://test.example.com/avatars/photo.jpg");
546 }
547
548 #[tokio::test]
549 async fn url_errors_without_public_url() {
550 let storage = Storage {
551 inner: Arc::new(StorageInner {
552 backend: BackendKind::Memory(MemoryBackend::new()),
553 public_url: None,
554 max_file_size: None,
555 fetch_client: None,
556 }),
557 };
558 assert!(storage.url("key.jpg").is_err());
559 }
560
561 #[tokio::test]
562 async fn presigned_url_works_on_memory() {
563 let storage = Storage::memory();
564 let url = storage
565 .presigned_url("key.jpg", std::time::Duration::from_secs(3600))
566 .await
567 .unwrap();
568 assert!(url.contains("key.jpg"));
569 assert!(url.contains("expires=3600"));
570 }
571
572 #[tokio::test]
573 async fn exists_false_for_missing() {
574 let storage = Storage::memory();
575 assert!(!storage.exists("nonexistent.jpg").await.unwrap());
576 }
577
578 #[tokio::test]
579 async fn put_rejects_path_traversal() {
580 let storage = Storage::memory();
581 let input = PutInput {
582 data: Bytes::from("data"),
583 prefix: "../etc/".into(),
584 filename: Some("f.txt".into()),
585 content_type: "text/plain".into(),
586 };
587 assert!(storage.put(&input).await.is_err());
588 }
589
590 #[tokio::test]
591 async fn put_rejects_absolute_path() {
592 let storage = Storage::memory();
593 let input = PutInput {
594 data: Bytes::from("data"),
595 prefix: "/root/".into(),
596 filename: Some("f.txt".into()),
597 content_type: "text/plain".into(),
598 };
599 assert!(storage.put(&input).await.is_err());
600 }
601
602 #[tokio::test]
603 async fn put_rejects_empty_prefix() {
604 let storage = Storage::memory();
605 let input = PutInput {
606 data: Bytes::from("data"),
607 prefix: "".into(),
608 filename: Some("f.txt".into()),
609 content_type: "text/plain".into(),
610 };
611 assert!(storage.put(&input).await.is_err());
612 }
613
614 #[tokio::test]
615 async fn put_from_url_memory_backend_returns_error() {
616 let storage = Storage::memory();
617 let input = PutFromUrlInput {
618 url: "https://example.com/file.jpg".into(),
619 prefix: "downloads/".into(),
620 filename: Some("file.jpg".into()),
621 };
622 let err = storage.put_from_url(&input).await.err().unwrap();
623 assert_eq!(err.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
624 }
625}