1use std::path::PathBuf;
17
18use bytes::{Bytes, BytesMut};
19use dashmap::DashMap;
20use tokio::io::AsyncReadExt as _;
21use tracing::{debug, trace, warn};
22
23use crate::{checksums, error::S3ServiceError};
24
25type StorageKey = (String, String, String);
27
28type PartKey = (String, String, u32);
30
31const DEFAULT_MAX_MEMORY_SIZE: usize = 524_288;
36
37#[derive(Debug, Clone)]
46pub struct WriteResult {
47 pub etag: String,
49 pub size: u64,
51 pub md5_hex: String,
53}
54
55enum StoredData {
65 InMemory {
67 data: Bytes,
69 },
70 OnDisk {
72 path: PathBuf,
74 size: u64,
76 },
77}
78
79impl std::fmt::Debug for StoredData {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 match self {
82 Self::InMemory { data } => f
83 .debug_struct("InMemory")
84 .field("size", &data.len())
85 .finish(),
86 Self::OnDisk { path, size } => f
87 .debug_struct("OnDisk")
88 .field("path", path)
89 .field("size", size)
90 .finish(),
91 }
92 }
93}
94
95impl Drop for StoredData {
96 fn drop(&mut self) {
97 if let Self::OnDisk { path, .. } = self {
98 let path = path.clone();
99 if let Ok(handle) = tokio::runtime::Handle::try_current() {
103 handle.spawn(async move {
104 if let Err(e) = tokio::fs::remove_file(&path).await {
105 if e.kind() != std::io::ErrorKind::NotFound {
106 warn!(path = %path.display(), error = %e, "failed to remove temp file");
107 }
108 } else {
109 trace!(path = %path.display(), "removed temp file");
110 }
111 });
112 }
113 }
114 }
115}
116
117impl StoredData {
118 async fn read_all(&self) -> Result<Bytes, S3ServiceError> {
120 match self {
121 Self::InMemory { data } => Ok(data.clone()),
122 Self::OnDisk { path, size } => {
123 let mut file = tokio::fs::File::open(path).await.map_err(|e| {
124 S3ServiceError::Internal(anyhow::anyhow!(
125 "failed to open temp file {}: {e}",
126 path.display()
127 ))
128 })?;
129 let capacity = usize::try_from(*size).unwrap_or(usize::MAX);
130 let mut buf = Vec::with_capacity(capacity);
131 file.read_to_end(&mut buf).await.map_err(|e| {
132 S3ServiceError::Internal(anyhow::anyhow!(
133 "failed to read temp file {}: {e}",
134 path.display()
135 ))
136 })?;
137 Ok(Bytes::from(buf))
138 }
139 }
140 }
141}
142
143pub struct InMemoryStorage {
175 objects: DashMap<StorageKey, StoredData>,
177 parts: DashMap<PartKey, StoredData>,
179 max_memory_size: usize,
181}
182
183impl std::fmt::Debug for InMemoryStorage {
184 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185 f.debug_struct("InMemoryStorage")
186 .field("objects_count", &self.objects.len())
187 .field("parts_count", &self.parts.len())
188 .field("max_memory_size", &self.max_memory_size)
189 .finish()
190 }
191}
192
193impl Default for InMemoryStorage {
194 fn default() -> Self {
195 Self::new(DEFAULT_MAX_MEMORY_SIZE)
196 }
197}
198
199impl InMemoryStorage {
200 #[must_use]
205 pub fn new(max_memory_size: usize) -> Self {
206 debug!(max_memory_size, "creating InMemoryStorage");
207 Self {
208 objects: DashMap::new(),
209 parts: DashMap::new(),
210 max_memory_size,
211 }
212 }
213
214 #[must_use]
216 pub fn default_max_memory_size() -> usize {
217 DEFAULT_MAX_MEMORY_SIZE
218 }
219
220 pub async fn write_object(
230 &self,
231 bucket: &str,
232 key: &str,
233 version_id: &str,
234 data: Bytes,
235 ) -> Result<WriteResult, S3ServiceError> {
236 let md5_hex = checksums::compute_md5(&data);
237 let etag = format!("\"{md5_hex}\"");
238 let size = data.len() as u64;
239
240 let stored = self.store_data(data).await?;
241
242 trace!(bucket, key, version_id, size, "stored object data");
243 self.objects.insert(
244 (bucket.to_owned(), key.to_owned(), version_id.to_owned()),
245 stored,
246 );
247
248 Ok(WriteResult {
249 etag,
250 size,
251 md5_hex,
252 })
253 }
254
255 pub async fn read_object(
266 &self,
267 bucket: &str,
268 key: &str,
269 version_id: &str,
270 range: Option<(u64, u64)>,
271 ) -> Result<Bytes, S3ServiceError> {
272 let storage_key = (bucket.to_owned(), key.to_owned(), version_id.to_owned());
273 let entry = self
274 .objects
275 .get(&storage_key)
276 .ok_or_else(|| S3ServiceError::NoSuchKey {
277 key: key.to_owned(),
278 })?;
279
280 let all_data = entry.value().read_all().await?;
281
282 match range {
283 Some((start, end)) => {
284 let data_len = all_data.len();
285 let start_idx = usize::try_from(start).map_err(|_| S3ServiceError::InvalidRange)?;
286 let end_idx = usize::try_from(end).map_err(|_| S3ServiceError::InvalidRange)?;
287 if start_idx >= data_len || end_idx >= data_len || start_idx > end_idx {
288 return Err(S3ServiceError::InvalidRange);
289 }
290 Ok(all_data.slice(start_idx..=end_idx))
291 }
292 None => Ok(all_data),
293 }
294 }
295
296 pub async fn copy_object(
306 &self,
307 src_bucket: &str,
308 src_key: &str,
309 src_version_id: &str,
310 dst_bucket: &str,
311 dst_key: &str,
312 dst_version_id: &str,
313 ) -> Result<WriteResult, S3ServiceError> {
314 let data = self
315 .read_object(src_bucket, src_key, src_version_id, None)
316 .await?;
317
318 debug!(
319 src_bucket,
320 src_key,
321 src_version_id,
322 dst_bucket,
323 dst_key,
324 dst_version_id,
325 size = data.len(),
326 "copying object data"
327 );
328
329 self.write_object(dst_bucket, dst_key, dst_version_id, data)
330 .await
331 }
332
333 pub fn delete_object(&self, bucket: &str, key: &str, version_id: &str) {
339 let storage_key = (bucket.to_owned(), key.to_owned(), version_id.to_owned());
340 if self.objects.remove(&storage_key).is_some() {
341 trace!(bucket, key, version_id, "deleted object data");
342 }
343 }
344
345 pub async fn write_part(
355 &self,
356 bucket: &str,
357 upload_id: &str,
358 part_number: u32,
359 data: Bytes,
360 ) -> Result<WriteResult, S3ServiceError> {
361 let md5_hex = checksums::compute_md5(&data);
362 let etag = format!("\"{md5_hex}\"");
363 let size = data.len() as u64;
364
365 let stored = self.store_data(data).await?;
366
367 trace!(bucket, upload_id, part_number, size, "stored part data");
368 self.parts.insert(
369 (bucket.to_owned(), upload_id.to_owned(), part_number),
370 stored,
371 );
372
373 Ok(WriteResult {
374 etag,
375 size,
376 md5_hex,
377 })
378 }
379
380 pub async fn read_part(
387 &self,
388 bucket: &str,
389 upload_id: &str,
390 part_number: u32,
391 ) -> Result<Bytes, S3ServiceError> {
392 let part_key = (bucket.to_owned(), upload_id.to_owned(), part_number);
393 let entry = self
394 .parts
395 .get(&part_key)
396 .ok_or(S3ServiceError::InvalidPart)?;
397
398 entry.value().read_all().await
399 }
400
401 pub async fn complete_multipart(
413 &self,
414 bucket: &str,
415 upload_id: &str,
416 key: &str,
417 version_id: &str,
418 part_numbers: &[u32],
419 ) -> Result<(WriteResult, Vec<String>), S3ServiceError> {
420 let mut combined = BytesMut::new();
421 let mut part_md5_hexes = Vec::with_capacity(part_numbers.len());
422
423 for &part_number in part_numbers {
424 let part_data = self.read_part(bucket, upload_id, part_number).await?;
425 let md5_hex = checksums::compute_md5(&part_data);
426 part_md5_hexes.push(md5_hex);
427 combined.extend_from_slice(&part_data);
428 }
429
430 let combined_bytes = combined.freeze();
431 let size = combined_bytes.len() as u64;
432
433 let etag = checksums::compute_multipart_etag(&part_md5_hexes, part_numbers.len());
435
436 let stored = self.store_data(combined_bytes).await?;
438 self.objects.insert(
439 (bucket.to_owned(), key.to_owned(), version_id.to_owned()),
440 stored,
441 );
442
443 self.abort_multipart(bucket, upload_id);
445
446 debug!(
447 bucket,
448 upload_id,
449 key,
450 version_id,
451 size,
452 parts = part_numbers.len(),
453 "completed multipart upload"
454 );
455
456 let composite_md5 = etag
459 .trim_matches('"')
460 .split('-')
461 .next()
462 .unwrap_or_default()
463 .to_owned();
464
465 Ok((
466 WriteResult {
467 etag,
468 size,
469 md5_hex: composite_md5,
470 },
471 part_md5_hexes,
472 ))
473 }
474
475 pub fn abort_multipart(&self, bucket: &str, upload_id: &str) {
480 self.parts.retain(|key, _| {
481 let matches = key.0 == bucket && key.1 == upload_id;
482 if matches {
483 trace!(bucket, upload_id, part_number = key.2, "removing part data");
484 }
485 !matches
486 });
487 }
488
489 pub fn delete_bucket_data(&self, bucket: &str) {
494 let obj_before = self.objects.len();
495 self.objects.retain(|key, _| key.0 != bucket);
496 let obj_removed = obj_before - self.objects.len();
497
498 let part_removed = self.remove_parts_by_bucket(bucket);
499
500 debug!(
501 bucket,
502 objects_removed = obj_removed,
503 parts_removed = part_removed,
504 "deleted all bucket data"
505 );
506 }
507
508 pub fn reset(&self) {
510 debug!("resetting all storage data");
511 self.objects.clear();
512 self.parts.clear();
513 }
514
515 async fn store_data(&self, data: Bytes) -> Result<StoredData, S3ServiceError> {
521 if data.len() > self.max_memory_size {
522 self.spill_to_disk(&data).await
523 } else {
524 Ok(StoredData::InMemory { data })
525 }
526 }
527
528 async fn spill_to_disk(&self, data: &[u8]) -> Result<StoredData, S3ServiceError> {
530 let size = data.len() as u64;
531
532 let temp = tempfile::NamedTempFile::new().map_err(|e| {
536 S3ServiceError::Internal(anyhow::anyhow!("failed to create temp file: {e}"))
537 })?;
538 let path = temp.path().to_path_buf();
539
540 temp.persist(&path).map_err(|e| {
542 S3ServiceError::Internal(anyhow::anyhow!(
543 "failed to persist temp file {}: {e}",
544 path.display()
545 ))
546 })?;
547
548 tokio::fs::write(&path, data).await.map_err(|e| {
550 S3ServiceError::Internal(anyhow::anyhow!(
551 "failed to write temp file {}: {e}",
552 path.display()
553 ))
554 })?;
555
556 trace!(path = %path.display(), size, "spilled data to disk");
557 Ok(StoredData::OnDisk { path, size })
558 }
559
560 fn remove_parts_by_bucket(&self, bucket: &str) -> usize {
563 let before = self.parts.len();
564 self.parts.retain(|key, _| key.0 != bucket);
565 before - self.parts.len()
566 }
567}
568
569#[cfg(test)]
574mod tests {
575 use super::*;
576
577 const TEST_THRESHOLD: usize = 64;
579
580 fn small_data() -> Bytes {
581 Bytes::from("hello world")
582 }
583
584 fn large_data() -> Bytes {
585 Bytes::from(vec![0xAB_u8; TEST_THRESHOLD + 1])
586 }
587
588 #[tokio::test]
593 async fn test_should_write_and_read_small_object() {
594 let storage = InMemoryStorage::new(TEST_THRESHOLD);
595 let data = small_data();
596 let result = storage
597 .write_object("bucket", "key", "null", data.clone())
598 .await;
599 assert!(result.is_ok());
600
601 let wr = result.unwrap_or_else(|e| panic!("write_object failed: {e}"));
602 assert_eq!(wr.size, data.len() as u64);
603 assert!(wr.etag.starts_with('"'));
604 assert!(wr.etag.ends_with('"'));
605 assert_eq!(wr.md5_hex, checksums::compute_md5(&data));
606
607 let read_data = storage
608 .read_object("bucket", "key", "null", None)
609 .await
610 .unwrap_or_else(|e| panic!("read_object failed: {e}"));
611 assert_eq!(read_data, data);
612 }
613
614 #[tokio::test]
619 async fn test_should_write_and_read_large_object_on_disk() {
620 let storage = InMemoryStorage::new(TEST_THRESHOLD);
621 let data = large_data();
622 let wr = storage
623 .write_object("bucket", "big", "null", data.clone())
624 .await
625 .unwrap_or_else(|e| panic!("write_object failed: {e}"));
626
627 assert_eq!(wr.size, data.len() as u64);
628
629 let read_data = storage
630 .read_object("bucket", "big", "null", None)
631 .await
632 .unwrap_or_else(|e| panic!("read_object failed: {e}"));
633 assert_eq!(read_data, data);
634 }
635
636 #[tokio::test]
641 async fn test_should_read_object_with_range() {
642 let storage = InMemoryStorage::new(TEST_THRESHOLD);
643 let data = Bytes::from("hello world");
644 storage
645 .write_object("bucket", "key", "null", data)
646 .await
647 .unwrap_or_else(|e| panic!("write failed: {e}"));
648
649 let range_data = storage
651 .read_object("bucket", "key", "null", Some((0, 4)))
652 .await
653 .unwrap_or_else(|e| panic!("range read failed: {e}"));
654 assert_eq!(range_data.as_ref(), b"hello");
655
656 let range_data = storage
658 .read_object("bucket", "key", "null", Some((6, 10)))
659 .await
660 .unwrap_or_else(|e| panic!("range read failed: {e}"));
661 assert_eq!(range_data.as_ref(), b"world");
662 }
663
664 #[tokio::test]
665 async fn test_should_reject_invalid_range() {
666 let storage = InMemoryStorage::new(TEST_THRESHOLD);
667 storage
668 .write_object("bucket", "key", "null", Bytes::from("abc"))
669 .await
670 .unwrap_or_else(|e| panic!("write failed: {e}"));
671
672 let result = storage
674 .read_object("bucket", "key", "null", Some((2, 1)))
675 .await;
676 assert!(matches!(result, Err(S3ServiceError::InvalidRange)));
677
678 let result = storage
680 .read_object("bucket", "key", "null", Some((0, 100)))
681 .await;
682 assert!(matches!(result, Err(S3ServiceError::InvalidRange)));
683 }
684
685 #[tokio::test]
690 async fn test_should_copy_object() {
691 let storage = InMemoryStorage::new(TEST_THRESHOLD);
692 let data = small_data();
693 storage
694 .write_object("src-bucket", "src-key", "null", data.clone())
695 .await
696 .unwrap_or_else(|e| panic!("write failed: {e}"));
697
698 let wr = storage
699 .copy_object(
700 "src-bucket",
701 "src-key",
702 "null",
703 "dst-bucket",
704 "dst-key",
705 "v1",
706 )
707 .await
708 .unwrap_or_else(|e| panic!("copy failed: {e}"));
709 assert_eq!(wr.size, data.len() as u64);
710
711 let dst_data = storage
712 .read_object("dst-bucket", "dst-key", "v1", None)
713 .await
714 .unwrap_or_else(|e| panic!("read dst failed: {e}"));
715 assert_eq!(dst_data, data);
716
717 let src_data = storage
719 .read_object("src-bucket", "src-key", "null", None)
720 .await
721 .unwrap_or_else(|e| panic!("read src failed: {e}"));
722 assert_eq!(src_data, data);
723 }
724
725 #[tokio::test]
726 async fn test_should_return_error_on_copy_nonexistent_source() {
727 let storage = InMemoryStorage::new(TEST_THRESHOLD);
728 let result = storage
729 .copy_object("bucket", "missing", "null", "dst", "key", "null")
730 .await;
731 assert!(matches!(result, Err(S3ServiceError::NoSuchKey { .. })));
732 }
733
734 #[tokio::test]
739 async fn test_should_delete_object() {
740 let storage = InMemoryStorage::new(TEST_THRESHOLD);
741 storage
742 .write_object("bucket", "key", "null", small_data())
743 .await
744 .unwrap_or_else(|e| panic!("write failed: {e}"));
745
746 storage.delete_object("bucket", "key", "null");
747
748 let result = storage.read_object("bucket", "key", "null", None).await;
749 assert!(matches!(result, Err(S3ServiceError::NoSuchKey { .. })));
750 }
751
752 #[tokio::test]
753 async fn test_should_not_panic_on_delete_nonexistent() {
754 let storage = InMemoryStorage::new(TEST_THRESHOLD);
755 storage.delete_object("bucket", "ghost", "null");
757 }
758
759 #[tokio::test]
764 async fn test_should_write_and_read_part() {
765 let storage = InMemoryStorage::new(TEST_THRESHOLD);
766 let data = Bytes::from("part-data");
767 let wr = storage
768 .write_part("bucket", "upload-1", 1, data.clone())
769 .await
770 .unwrap_or_else(|e| panic!("write_part failed: {e}"));
771
772 assert_eq!(wr.size, data.len() as u64);
773
774 let read = storage
775 .read_part("bucket", "upload-1", 1)
776 .await
777 .unwrap_or_else(|e| panic!("read_part failed: {e}"));
778 assert_eq!(read, data);
779 }
780
781 #[tokio::test]
782 async fn test_should_return_error_on_read_missing_part() {
783 let storage = InMemoryStorage::new(TEST_THRESHOLD);
784 let result = storage.read_part("bucket", "upload-1", 99).await;
785 assert!(matches!(result, Err(S3ServiceError::InvalidPart)));
786 }
787
788 #[tokio::test]
789 async fn test_should_complete_multipart_upload() {
790 let storage = InMemoryStorage::new(TEST_THRESHOLD);
791
792 let part1 = Bytes::from("hello ");
793 let part2 = Bytes::from("world");
794
795 storage
796 .write_part("bucket", "upload-1", 1, part1.clone())
797 .await
798 .unwrap_or_else(|e| panic!("write part 1 failed: {e}"));
799 storage
800 .write_part("bucket", "upload-1", 2, part2.clone())
801 .await
802 .unwrap_or_else(|e| panic!("write part 2 failed: {e}"));
803
804 let (wr, part_md5s) = storage
805 .complete_multipart("bucket", "upload-1", "assembled-key", "null", &[1, 2])
806 .await
807 .unwrap_or_else(|e| panic!("complete_multipart failed: {e}"));
808
809 assert_eq!(wr.size, (part1.len() + part2.len()) as u64);
811
812 assert!(
814 wr.etag.contains("-2"),
815 "expected composite ETag, got {}",
816 wr.etag
817 );
818
819 assert_eq!(part_md5s.len(), 2);
821 assert_eq!(part_md5s[0], checksums::compute_md5(&part1));
822 assert_eq!(part_md5s[1], checksums::compute_md5(&part2));
823
824 let data = storage
826 .read_object("bucket", "assembled-key", "null", None)
827 .await
828 .unwrap_or_else(|e| panic!("read assembled object failed: {e}"));
829 assert_eq!(data.as_ref(), b"hello world");
830
831 let part_read = storage.read_part("bucket", "upload-1", 1).await;
833 assert!(
834 matches!(part_read, Err(S3ServiceError::InvalidPart)),
835 "parts should be cleaned up after complete"
836 );
837 }
838
839 #[tokio::test]
840 async fn test_should_return_error_on_complete_with_missing_part() {
841 let storage = InMemoryStorage::new(TEST_THRESHOLD);
842 storage
843 .write_part("bucket", "upload-1", 1, Bytes::from("data"))
844 .await
845 .unwrap_or_else(|e| panic!("write part failed: {e}"));
846
847 let result = storage
849 .complete_multipart("bucket", "upload-1", "key", "null", &[1, 2])
850 .await;
851 assert!(matches!(result, Err(S3ServiceError::InvalidPart)));
852 }
853
854 #[tokio::test]
855 async fn test_should_abort_multipart() {
856 let storage = InMemoryStorage::new(TEST_THRESHOLD);
857 storage
858 .write_part("bucket", "upload-1", 1, Bytes::from("a"))
859 .await
860 .unwrap_or_else(|e| panic!("write part 1 failed: {e}"));
861 storage
862 .write_part("bucket", "upload-1", 2, Bytes::from("b"))
863 .await
864 .unwrap_or_else(|e| panic!("write part 2 failed: {e}"));
865
866 storage
868 .write_part("bucket", "upload-2", 1, Bytes::from("c"))
869 .await
870 .unwrap_or_else(|e| panic!("write part for upload-2 failed: {e}"));
871
872 storage.abort_multipart("bucket", "upload-1");
873
874 assert!(matches!(
876 storage.read_part("bucket", "upload-1", 1).await,
877 Err(S3ServiceError::InvalidPart)
878 ));
879 assert!(matches!(
880 storage.read_part("bucket", "upload-1", 2).await,
881 Err(S3ServiceError::InvalidPart)
882 ));
883
884 let data = storage
886 .read_part("bucket", "upload-2", 1)
887 .await
888 .unwrap_or_else(|e| panic!("read part for upload-2 failed: {e}"));
889 assert_eq!(data.as_ref(), b"c");
890 }
891
892 #[tokio::test]
897 async fn test_should_delete_bucket_data() {
898 let storage = InMemoryStorage::new(TEST_THRESHOLD);
899 storage
900 .write_object("target", "obj1", "null", Bytes::from("a"))
901 .await
902 .unwrap_or_else(|e| panic!("write obj1 failed: {e}"));
903 storage
904 .write_object("target", "obj2", "null", Bytes::from("b"))
905 .await
906 .unwrap_or_else(|e| panic!("write obj2 failed: {e}"));
907 storage
908 .write_part("target", "upload-1", 1, Bytes::from("p"))
909 .await
910 .unwrap_or_else(|e| panic!("write part failed: {e}"));
911
912 storage
914 .write_object("other", "obj3", "null", Bytes::from("c"))
915 .await
916 .unwrap_or_else(|e| panic!("write obj3 failed: {e}"));
917
918 storage.delete_bucket_data("target");
919
920 assert!(matches!(
922 storage.read_object("target", "obj1", "null", None).await,
923 Err(S3ServiceError::NoSuchKey { .. })
924 ));
925 assert!(matches!(
926 storage.read_object("target", "obj2", "null", None).await,
927 Err(S3ServiceError::NoSuchKey { .. })
928 ));
929 assert!(matches!(
930 storage.read_part("target", "upload-1", 1).await,
931 Err(S3ServiceError::InvalidPart)
932 ));
933
934 let data = storage
936 .read_object("other", "obj3", "null", None)
937 .await
938 .unwrap_or_else(|e| panic!("read obj3 failed: {e}"));
939 assert_eq!(data.as_ref(), b"c");
940 }
941
942 #[tokio::test]
947 async fn test_should_reset_all_storage() {
948 let storage = InMemoryStorage::new(TEST_THRESHOLD);
949 storage
950 .write_object("b1", "k1", "null", Bytes::from("data1"))
951 .await
952 .unwrap_or_else(|e| panic!("write1 failed: {e}"));
953 storage
954 .write_object("b2", "k2", "null", Bytes::from("data2"))
955 .await
956 .unwrap_or_else(|e| panic!("write2 failed: {e}"));
957 storage
958 .write_part("b1", "upload", 1, Bytes::from("part"))
959 .await
960 .unwrap_or_else(|e| panic!("write part failed: {e}"));
961
962 storage.reset();
963
964 assert!(matches!(
965 storage.read_object("b1", "k1", "null", None).await,
966 Err(S3ServiceError::NoSuchKey { .. })
967 ));
968 assert!(matches!(
969 storage.read_object("b2", "k2", "null", None).await,
970 Err(S3ServiceError::NoSuchKey { .. })
971 ));
972 assert!(matches!(
973 storage.read_part("b1", "upload", 1).await,
974 Err(S3ServiceError::InvalidPart)
975 ));
976 }
977
978 #[test]
983 fn test_should_create_default_storage() {
984 let storage = InMemoryStorage::default();
985 assert_eq!(
986 InMemoryStorage::default_max_memory_size(),
987 DEFAULT_MAX_MEMORY_SIZE
988 );
989 let debug_str = format!("{storage:?}");
990 assert!(debug_str.contains("InMemoryStorage"));
991 }
992
993 #[tokio::test]
998 async fn test_should_clean_up_on_overwrite() {
999 let storage = InMemoryStorage::new(TEST_THRESHOLD);
1000 let data1 = large_data();
1001
1002 storage
1003 .write_object("bucket", "key", "null", data1)
1004 .await
1005 .unwrap_or_else(|e| panic!("write1 failed: {e}"));
1006
1007 let data2 = Bytes::from("small");
1010 storage
1011 .write_object("bucket", "key", "null", data2.clone())
1012 .await
1013 .unwrap_or_else(|e| panic!("write2 failed: {e}"));
1014
1015 let read = storage
1016 .read_object("bucket", "key", "null", None)
1017 .await
1018 .unwrap_or_else(|e| panic!("read failed: {e}"));
1019 assert_eq!(read, data2);
1020 }
1021
1022 #[tokio::test]
1027 async fn test_should_write_and_read_large_part_on_disk() {
1028 let storage = InMemoryStorage::new(TEST_THRESHOLD);
1029 let data = large_data();
1030
1031 let wr = storage
1032 .write_part("bucket", "upload-big", 1, data.clone())
1033 .await
1034 .unwrap_or_else(|e| panic!("write_part failed: {e}"));
1035 assert_eq!(wr.size, data.len() as u64);
1036
1037 let read = storage
1038 .read_part("bucket", "upload-big", 1)
1039 .await
1040 .unwrap_or_else(|e| panic!("read_part failed: {e}"));
1041 assert_eq!(read, data);
1042 }
1043
1044 #[tokio::test]
1049 async fn test_should_read_all_bytes_from_large_on_disk_object() {
1050 const SIZE: usize = 1024 * 1024;
1054 let storage = InMemoryStorage::new(TEST_THRESHOLD);
1055 let data = Bytes::from(vec![0x42_u8; SIZE]);
1056
1057 storage
1058 .write_object("bucket", "big-obj", "null", data.clone())
1059 .await
1060 .unwrap_or_else(|e| panic!("write failed: {e}"));
1061
1062 let read_data = storage
1063 .read_object("bucket", "big-obj", "null", None)
1064 .await
1065 .unwrap_or_else(|e| panic!("read failed: {e}"));
1066
1067 assert_eq!(
1068 read_data.len(),
1069 SIZE,
1070 "expected {SIZE} bytes, got {}",
1071 read_data.len()
1072 );
1073 assert_eq!(read_data, data);
1074 }
1075
1076 #[tokio::test]
1081 async fn test_should_return_error_on_read_nonexistent_object() {
1082 let storage = InMemoryStorage::new(TEST_THRESHOLD);
1083 let result = storage.read_object("bucket", "ghost", "null", None).await;
1084 assert!(matches!(result, Err(S3ServiceError::NoSuchKey { .. })));
1085 }
1086}