Skip to main content

rustack_s3_core/
storage.rs

1//! In-memory storage backend for S3 object body data.
2//!
3//! Objects below a configurable threshold are kept in memory as [`Bytes`].
4//! Objects above the threshold are spilled to temporary files on disk.
5//!
6//! The [`InMemoryStorage`] type is thread-safe and uses [`DashMap`] for
7//! concurrent access to stored objects and multipart parts.
8//!
9//! # Spillover to Disk
10//!
11//! When object data exceeds the configured `max_memory_size`, the bytes
12//! are written to a temporary file via the [`tempfile`] crate. On-disk data
13//! is automatically cleaned up when the entry is removed from the map (via
14//! the internal stored data `Drop` implementation).
15
16use std::path::PathBuf;
17
18use bytes::{Bytes, BytesMut};
19use dashmap::DashMap;
20use tokio::io::AsyncReadExt as _;
21use tracing::{debug, trace, warn};
22
23use crate::{checksums, error::S3ServiceError};
24
25/// Composite key identifying a stored object: `(bucket, key, version_id)`.
26type StorageKey = (String, String, String);
27
28/// Composite key identifying a multipart part: `(bucket, upload_id, part_number)`.
29type PartKey = (String, String, u32);
30
31/// Default maximum object size (in bytes) kept in memory before spilling to
32/// disk. Objects larger than this threshold are written to temporary files.
33///
34/// The default is 512 KiB.
35const DEFAULT_MAX_MEMORY_SIZE: usize = 524_288;
36
37// ---------------------------------------------------------------------------
38// WriteResult
39// ---------------------------------------------------------------------------
40
41/// Result of writing data to storage.
42///
43/// Contains the computed ETag, data size, and raw MD5 hex digest for the
44/// written object or part.
45#[derive(Debug, Clone)]
46pub struct WriteResult {
47    /// The ETag (quoted hex MD5) of the written data.
48    pub etag: String,
49    /// The size in bytes.
50    pub size: u64,
51    /// The MD5 hex digest (unquoted).
52    pub md5_hex: String,
53}
54
55// ---------------------------------------------------------------------------
56// StoredData
57// ---------------------------------------------------------------------------
58
59/// Internal representation of stored data.
60///
61/// Small objects are kept in memory as [`Bytes`]. Large objects are spilled
62/// to a temporary file on disk. When a [`StoredData::OnDisk`] value is
63/// dropped, the temporary file is removed.
64enum StoredData {
65    /// Small objects kept entirely in memory.
66    InMemory {
67        /// The raw object bytes.
68        data: Bytes,
69    },
70    /// Large objects spilled to a temp file.
71    OnDisk {
72        /// Path to the temporary file.
73        path: PathBuf,
74        /// Size of the stored data in bytes.
75        size: u64,
76    },
77}
78
79impl std::fmt::Debug for StoredData {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        match self {
82            Self::InMemory { data } => f
83                .debug_struct("InMemory")
84                .field("size", &data.len())
85                .finish(),
86            Self::OnDisk { path, size } => f
87                .debug_struct("OnDisk")
88                .field("path", path)
89                .field("size", size)
90                .finish(),
91        }
92    }
93}
94
95impl Drop for StoredData {
96    fn drop(&mut self) {
97        if let Self::OnDisk { path, .. } = self {
98            let path = path.clone();
99            // Spawn async file removal on the tokio runtime. If no runtime is
100            // active (e.g. during test teardown), the file will be cleaned up
101            // by the OS temp-dir reaper.
102            if let Ok(handle) = tokio::runtime::Handle::try_current() {
103                handle.spawn(async move {
104                    if let Err(e) = tokio::fs::remove_file(&path).await {
105                        if e.kind() != std::io::ErrorKind::NotFound {
106                            warn!(path = %path.display(), error = %e, "failed to remove temp file");
107                        }
108                    } else {
109                        trace!(path = %path.display(), "removed temp file");
110                    }
111                });
112            }
113        }
114    }
115}
116
117impl StoredData {
118    /// Read the full data from this stored entry.
119    async fn read_all(&self) -> Result<Bytes, S3ServiceError> {
120        match self {
121            Self::InMemory { data } => Ok(data.clone()),
122            Self::OnDisk { path, size } => {
123                let mut file = tokio::fs::File::open(path).await.map_err(|e| {
124                    S3ServiceError::Internal(anyhow::anyhow!(
125                        "failed to open temp file {}: {e}",
126                        path.display()
127                    ))
128                })?;
129                let capacity = usize::try_from(*size).unwrap_or(usize::MAX);
130                let mut buf = Vec::with_capacity(capacity);
131                file.read_to_end(&mut buf).await.map_err(|e| {
132                    S3ServiceError::Internal(anyhow::anyhow!(
133                        "failed to read temp file {}: {e}",
134                        path.display()
135                    ))
136                })?;
137                Ok(Bytes::from(buf))
138            }
139        }
140    }
141}
142
143// ---------------------------------------------------------------------------
144// InMemoryStorage
145// ---------------------------------------------------------------------------
146
147/// In-memory storage with automatic spillover to tempfiles for large objects.
148///
149/// Thread-safe: uses [`DashMap`] for concurrent access. Objects larger than
150/// the configured `max_memory_size` are transparently written to
151/// temporary files and read back on demand.
152///
153/// # Examples
154///
155/// ```
156/// use bytes::Bytes;
157/// use rustack_s3_core::storage::InMemoryStorage;
158///
159/// # tokio_test::block_on(async {
160/// let storage = InMemoryStorage::new(1024);
161/// let result = storage
162///     .write_object("my-bucket", "hello.txt", "null", Bytes::from("hello"))
163///     .await
164///     .unwrap();
165/// assert_eq!(result.size, 5);
166///
167/// let data = storage
168///     .read_object("my-bucket", "hello.txt", "null", None)
169///     .await
170///     .unwrap();
171/// assert_eq!(data.as_ref(), b"hello");
172/// # });
173/// ```
174pub struct InMemoryStorage {
175    /// Object data keyed by `(bucket, key, version_id)`.
176    objects: DashMap<StorageKey, StoredData>,
177    /// Multipart part data keyed by `(bucket, upload_id, part_number)`.
178    parts: DashMap<PartKey, StoredData>,
179    /// Max size in bytes for in-memory storage before spilling to disk.
180    max_memory_size: usize,
181}
182
183impl std::fmt::Debug for InMemoryStorage {
184    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185        f.debug_struct("InMemoryStorage")
186            .field("objects_count", &self.objects.len())
187            .field("parts_count", &self.parts.len())
188            .field("max_memory_size", &self.max_memory_size)
189            .finish()
190    }
191}
192
193impl Default for InMemoryStorage {
194    fn default() -> Self {
195        Self::new(DEFAULT_MAX_MEMORY_SIZE)
196    }
197}
198
199impl InMemoryStorage {
200    /// Create a new storage backend with the given memory threshold.
201    ///
202    /// Objects larger than `max_memory_size` bytes are spilled to temporary
203    /// files on disk.
204    #[must_use]
205    pub fn new(max_memory_size: usize) -> Self {
206        debug!(max_memory_size, "creating InMemoryStorage");
207        Self {
208            objects: DashMap::new(),
209            parts: DashMap::new(),
210            max_memory_size,
211        }
212    }
213
214    /// Return the default maximum in-memory object size (512 KiB).
215    #[must_use]
216    pub fn default_max_memory_size() -> usize {
217        DEFAULT_MAX_MEMORY_SIZE
218    }
219
220    /// Store object data. Computes MD5 and returns a [`WriteResult`].
221    ///
222    /// If the data exceeds the configured memory threshold, it is spilled
223    /// to a temporary file on disk.
224    ///
225    /// # Errors
226    ///
227    /// Returns [`S3ServiceError::Internal`] if the temporary file cannot be
228    /// created or written.
229    pub async fn write_object(
230        &self,
231        bucket: &str,
232        key: &str,
233        version_id: &str,
234        data: Bytes,
235    ) -> Result<WriteResult, S3ServiceError> {
236        let md5_hex = checksums::compute_md5(&data);
237        let etag = format!("\"{md5_hex}\"");
238        let size = data.len() as u64;
239
240        let stored = self.store_data(data).await?;
241
242        trace!(bucket, key, version_id, size, "stored object data");
243        self.objects.insert(
244            (bucket.to_owned(), key.to_owned(), version_id.to_owned()),
245            stored,
246        );
247
248        Ok(WriteResult {
249            etag,
250            size,
251            md5_hex,
252        })
253    }
254
255    /// Read object data. Returns the full [`Bytes`] for the object.
256    ///
257    /// If `range` is specified as `(start, end)` (inclusive on both ends),
258    /// only that byte range is returned.
259    ///
260    /// # Errors
261    ///
262    /// - [`S3ServiceError::NoSuchKey`] if the object is not found.
263    /// - [`S3ServiceError::InvalidRange`] if the range is out of bounds.
264    /// - [`S3ServiceError::Internal`] if the on-disk file cannot be read.
265    pub async fn read_object(
266        &self,
267        bucket: &str,
268        key: &str,
269        version_id: &str,
270        range: Option<(u64, u64)>,
271    ) -> Result<Bytes, S3ServiceError> {
272        let storage_key = (bucket.to_owned(), key.to_owned(), version_id.to_owned());
273        let entry = self
274            .objects
275            .get(&storage_key)
276            .ok_or_else(|| S3ServiceError::NoSuchKey {
277                key: key.to_owned(),
278            })?;
279
280        let all_data = entry.value().read_all().await?;
281
282        match range {
283            Some((start, end)) => {
284                let data_len = all_data.len();
285                let start_idx = usize::try_from(start).map_err(|_| S3ServiceError::InvalidRange)?;
286                let end_idx = usize::try_from(end).map_err(|_| S3ServiceError::InvalidRange)?;
287                if start_idx >= data_len || end_idx >= data_len || start_idx > end_idx {
288                    return Err(S3ServiceError::InvalidRange);
289                }
290                Ok(all_data.slice(start_idx..=end_idx))
291            }
292            None => Ok(all_data),
293        }
294    }
295
296    /// Copy object data from one location to another.
297    ///
298    /// Reads the source object data, then writes it to the destination
299    /// location. Returns a [`WriteResult`] for the destination object.
300    ///
301    /// # Errors
302    ///
303    /// - [`S3ServiceError::NoSuchKey`] if the source object is not found.
304    /// - [`S3ServiceError::Internal`] if disk I/O fails.
305    pub async fn copy_object(
306        &self,
307        src_bucket: &str,
308        src_key: &str,
309        src_version_id: &str,
310        dst_bucket: &str,
311        dst_key: &str,
312        dst_version_id: &str,
313    ) -> Result<WriteResult, S3ServiceError> {
314        let data = self
315            .read_object(src_bucket, src_key, src_version_id, None)
316            .await?;
317
318        debug!(
319            src_bucket,
320            src_key,
321            src_version_id,
322            dst_bucket,
323            dst_key,
324            dst_version_id,
325            size = data.len(),
326            "copying object data"
327        );
328
329        self.write_object(dst_bucket, dst_key, dst_version_id, data)
330            .await
331    }
332
333    /// Delete object data.
334    ///
335    /// Removes the stored data for the given object key. If the data was on
336    /// disk, the temporary file is cleaned up via the [`Drop`] implementation.
337    /// This is a no-op if the object does not exist.
338    pub fn delete_object(&self, bucket: &str, key: &str, version_id: &str) {
339        let storage_key = (bucket.to_owned(), key.to_owned(), version_id.to_owned());
340        if self.objects.remove(&storage_key).is_some() {
341            trace!(bucket, key, version_id, "deleted object data");
342        }
343    }
344
345    /// Store a multipart part.
346    ///
347    /// Computes MD5 and returns a [`WriteResult`]. If the part data exceeds
348    /// the memory threshold, it is spilled to a temporary file.
349    ///
350    /// # Errors
351    ///
352    /// Returns [`S3ServiceError::Internal`] if the temporary file cannot be
353    /// created or written.
354    pub async fn write_part(
355        &self,
356        bucket: &str,
357        upload_id: &str,
358        part_number: u32,
359        data: Bytes,
360    ) -> Result<WriteResult, S3ServiceError> {
361        let md5_hex = checksums::compute_md5(&data);
362        let etag = format!("\"{md5_hex}\"");
363        let size = data.len() as u64;
364
365        let stored = self.store_data(data).await?;
366
367        trace!(bucket, upload_id, part_number, size, "stored part data");
368        self.parts.insert(
369            (bucket.to_owned(), upload_id.to_owned(), part_number),
370            stored,
371        );
372
373        Ok(WriteResult {
374            etag,
375            size,
376            md5_hex,
377        })
378    }
379
380    /// Read a multipart part's data.
381    ///
382    /// # Errors
383    ///
384    /// - [`S3ServiceError::InvalidPart`] if the part does not exist.
385    /// - [`S3ServiceError::Internal`] if the on-disk file cannot be read.
386    pub async fn read_part(
387        &self,
388        bucket: &str,
389        upload_id: &str,
390        part_number: u32,
391    ) -> Result<Bytes, S3ServiceError> {
392        let part_key = (bucket.to_owned(), upload_id.to_owned(), part_number);
393        let entry = self
394            .parts
395            .get(&part_key)
396            .ok_or(S3ServiceError::InvalidPart)?;
397
398        entry.value().read_all().await
399    }
400
401    /// Assemble parts into a final object. Concatenates part data in order.
402    ///
403    /// Returns a tuple of `(WriteResult, Vec<String>)` where the vector
404    /// contains the individual (unquoted) MD5 hex digests for each part.
405    /// The [`WriteResult::etag`] is a composite ETag in the format
406    /// `"<md5>-<part_count>"`.
407    ///
408    /// # Errors
409    ///
410    /// - [`S3ServiceError::InvalidPart`] if any requested part does not exist.
411    /// - [`S3ServiceError::Internal`] if disk I/O fails.
412    pub async fn complete_multipart(
413        &self,
414        bucket: &str,
415        upload_id: &str,
416        key: &str,
417        version_id: &str,
418        part_numbers: &[u32],
419    ) -> Result<(WriteResult, Vec<String>), S3ServiceError> {
420        let mut combined = BytesMut::new();
421        let mut part_md5_hexes = Vec::with_capacity(part_numbers.len());
422
423        for &part_number in part_numbers {
424            let part_data = self.read_part(bucket, upload_id, part_number).await?;
425            let md5_hex = checksums::compute_md5(&part_data);
426            part_md5_hexes.push(md5_hex);
427            combined.extend_from_slice(&part_data);
428        }
429
430        let combined_bytes = combined.freeze();
431        let size = combined_bytes.len() as u64;
432
433        // Compute composite ETag: MD5-of-concatenated-MD5s with part count suffix.
434        let etag = checksums::compute_multipart_etag(&part_md5_hexes, part_numbers.len());
435
436        // Store the assembled object.
437        let stored = self.store_data(combined_bytes).await?;
438        self.objects.insert(
439            (bucket.to_owned(), key.to_owned(), version_id.to_owned()),
440            stored,
441        );
442
443        // Clean up the parts for this upload.
444        self.abort_multipart(bucket, upload_id);
445
446        debug!(
447            bucket,
448            upload_id,
449            key,
450            version_id,
451            size,
452            parts = part_numbers.len(),
453            "completed multipart upload"
454        );
455
456        // The md5_hex for the composite result is the hash portion of the ETag
457        // (without quotes and without the -N suffix).
458        let composite_md5 = etag
459            .trim_matches('"')
460            .split('-')
461            .next()
462            .unwrap_or_default()
463            .to_owned();
464
465        Ok((
466            WriteResult {
467                etag,
468                size,
469                md5_hex: composite_md5,
470            },
471            part_md5_hexes,
472        ))
473    }
474
475    /// Delete all parts for a multipart upload.
476    ///
477    /// Removes all stored part data associated with the given upload ID.
478    /// Temporary files are cleaned up automatically via [`Drop`].
479    pub fn abort_multipart(&self, bucket: &str, upload_id: &str) {
480        self.parts.retain(|key, _| {
481            let matches = key.0 == bucket && key.1 == upload_id;
482            if matches {
483                trace!(bucket, upload_id, part_number = key.2, "removing part data");
484            }
485            !matches
486        });
487    }
488
489    /// Delete all data (objects and parts) for a bucket.
490    ///
491    /// This removes both object data and any in-progress multipart part data
492    /// associated with the bucket.
493    pub fn delete_bucket_data(&self, bucket: &str) {
494        let obj_before = self.objects.len();
495        self.objects.retain(|key, _| key.0 != bucket);
496        let obj_removed = obj_before - self.objects.len();
497
498        let part_removed = self.remove_parts_by_bucket(bucket);
499
500        debug!(
501            bucket,
502            objects_removed = obj_removed,
503            parts_removed = part_removed,
504            "deleted all bucket data"
505        );
506    }
507
508    /// Reset all storage, removing every object and part.
509    pub fn reset(&self) {
510        debug!("resetting all storage data");
511        self.objects.clear();
512        self.parts.clear();
513    }
514
515    // -----------------------------------------------------------------------
516    // Private helpers
517    // -----------------------------------------------------------------------
518
519    /// Store data either in memory or on disk, depending on size.
520    async fn store_data(&self, data: Bytes) -> Result<StoredData, S3ServiceError> {
521        if data.len() > self.max_memory_size {
522            self.spill_to_disk(&data).await
523        } else {
524            Ok(StoredData::InMemory { data })
525        }
526    }
527
528    /// Write data to a temporary file and return an [`StoredData::OnDisk`].
529    async fn spill_to_disk(&self, data: &[u8]) -> Result<StoredData, S3ServiceError> {
530        let size = data.len() as u64;
531
532        // Create the temp file synchronously (tempfile::NamedTempFile uses
533        // the OS temp directory) then persist it so it is not deleted when
534        // the NamedTempFile handle is dropped -- we manage cleanup in Drop.
535        let temp = tempfile::NamedTempFile::new().map_err(|e| {
536            S3ServiceError::Internal(anyhow::anyhow!("failed to create temp file: {e}"))
537        })?;
538        let path = temp.path().to_path_buf();
539
540        // Persist the named temp file so it is not auto-deleted.
541        temp.persist(&path).map_err(|e| {
542            S3ServiceError::Internal(anyhow::anyhow!(
543                "failed to persist temp file {}: {e}",
544                path.display()
545            ))
546        })?;
547
548        // Write data asynchronously.
549        tokio::fs::write(&path, data).await.map_err(|e| {
550            S3ServiceError::Internal(anyhow::anyhow!(
551                "failed to write temp file {}: {e}",
552                path.display()
553            ))
554        })?;
555
556        trace!(path = %path.display(), size, "spilled data to disk");
557        Ok(StoredData::OnDisk { path, size })
558    }
559
560    /// Remove all part entries whose bucket matches `bucket`.
561    /// Returns the number of entries removed.
562    fn remove_parts_by_bucket(&self, bucket: &str) -> usize {
563        let before = self.parts.len();
564        self.parts.retain(|key, _| key.0 != bucket);
565        before - self.parts.len()
566    }
567}
568
569// ---------------------------------------------------------------------------
570// Tests
571// ---------------------------------------------------------------------------
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576
577    /// Threshold for tests: 64 bytes. Anything larger spills to disk.
578    const TEST_THRESHOLD: usize = 64;
579
580    fn small_data() -> Bytes {
581        Bytes::from("hello world")
582    }
583
584    fn large_data() -> Bytes {
585        Bytes::from(vec![0xAB_u8; TEST_THRESHOLD + 1])
586    }
587
588    // -----------------------------------------------------------------------
589    // Small object write / read
590    // -----------------------------------------------------------------------
591
592    #[tokio::test]
593    async fn test_should_write_and_read_small_object() {
594        let storage = InMemoryStorage::new(TEST_THRESHOLD);
595        let data = small_data();
596        let result = storage
597            .write_object("bucket", "key", "null", data.clone())
598            .await;
599        assert!(result.is_ok());
600
601        let wr = result.unwrap_or_else(|e| panic!("write_object failed: {e}"));
602        assert_eq!(wr.size, data.len() as u64);
603        assert!(wr.etag.starts_with('"'));
604        assert!(wr.etag.ends_with('"'));
605        assert_eq!(wr.md5_hex, checksums::compute_md5(&data));
606
607        let read_data = storage
608            .read_object("bucket", "key", "null", None)
609            .await
610            .unwrap_or_else(|e| panic!("read_object failed: {e}"));
611        assert_eq!(read_data, data);
612    }
613
614    // -----------------------------------------------------------------------
615    // Large object write / read (spillover)
616    // -----------------------------------------------------------------------
617
618    #[tokio::test]
619    async fn test_should_write_and_read_large_object_on_disk() {
620        let storage = InMemoryStorage::new(TEST_THRESHOLD);
621        let data = large_data();
622        let wr = storage
623            .write_object("bucket", "big", "null", data.clone())
624            .await
625            .unwrap_or_else(|e| panic!("write_object failed: {e}"));
626
627        assert_eq!(wr.size, data.len() as u64);
628
629        let read_data = storage
630            .read_object("bucket", "big", "null", None)
631            .await
632            .unwrap_or_else(|e| panic!("read_object failed: {e}"));
633        assert_eq!(read_data, data);
634    }
635
636    // -----------------------------------------------------------------------
637    // Range reads
638    // -----------------------------------------------------------------------
639
640    #[tokio::test]
641    async fn test_should_read_object_with_range() {
642        let storage = InMemoryStorage::new(TEST_THRESHOLD);
643        let data = Bytes::from("hello world");
644        storage
645            .write_object("bucket", "key", "null", data)
646            .await
647            .unwrap_or_else(|e| panic!("write failed: {e}"));
648
649        // Read bytes 0..=4 => "hello"
650        let range_data = storage
651            .read_object("bucket", "key", "null", Some((0, 4)))
652            .await
653            .unwrap_or_else(|e| panic!("range read failed: {e}"));
654        assert_eq!(range_data.as_ref(), b"hello");
655
656        // Read bytes 6..=10 => "world"
657        let range_data = storage
658            .read_object("bucket", "key", "null", Some((6, 10)))
659            .await
660            .unwrap_or_else(|e| panic!("range read failed: {e}"));
661        assert_eq!(range_data.as_ref(), b"world");
662    }
663
664    #[tokio::test]
665    async fn test_should_reject_invalid_range() {
666        let storage = InMemoryStorage::new(TEST_THRESHOLD);
667        storage
668            .write_object("bucket", "key", "null", Bytes::from("abc"))
669            .await
670            .unwrap_or_else(|e| panic!("write failed: {e}"));
671
672        // start > end
673        let result = storage
674            .read_object("bucket", "key", "null", Some((2, 1)))
675            .await;
676        assert!(matches!(result, Err(S3ServiceError::InvalidRange)));
677
678        // end out of bounds
679        let result = storage
680            .read_object("bucket", "key", "null", Some((0, 100)))
681            .await;
682        assert!(matches!(result, Err(S3ServiceError::InvalidRange)));
683    }
684
685    // -----------------------------------------------------------------------
686    // Copy object
687    // -----------------------------------------------------------------------
688
689    #[tokio::test]
690    async fn test_should_copy_object() {
691        let storage = InMemoryStorage::new(TEST_THRESHOLD);
692        let data = small_data();
693        storage
694            .write_object("src-bucket", "src-key", "null", data.clone())
695            .await
696            .unwrap_or_else(|e| panic!("write failed: {e}"));
697
698        let wr = storage
699            .copy_object(
700                "src-bucket",
701                "src-key",
702                "null",
703                "dst-bucket",
704                "dst-key",
705                "v1",
706            )
707            .await
708            .unwrap_or_else(|e| panic!("copy failed: {e}"));
709        assert_eq!(wr.size, data.len() as u64);
710
711        let dst_data = storage
712            .read_object("dst-bucket", "dst-key", "v1", None)
713            .await
714            .unwrap_or_else(|e| panic!("read dst failed: {e}"));
715        assert_eq!(dst_data, data);
716
717        // Source still exists.
718        let src_data = storage
719            .read_object("src-bucket", "src-key", "null", None)
720            .await
721            .unwrap_or_else(|e| panic!("read src failed: {e}"));
722        assert_eq!(src_data, data);
723    }
724
725    #[tokio::test]
726    async fn test_should_return_error_on_copy_nonexistent_source() {
727        let storage = InMemoryStorage::new(TEST_THRESHOLD);
728        let result = storage
729            .copy_object("bucket", "missing", "null", "dst", "key", "null")
730            .await;
731        assert!(matches!(result, Err(S3ServiceError::NoSuchKey { .. })));
732    }
733
734    // -----------------------------------------------------------------------
735    // Delete object
736    // -----------------------------------------------------------------------
737
738    #[tokio::test]
739    async fn test_should_delete_object() {
740        let storage = InMemoryStorage::new(TEST_THRESHOLD);
741        storage
742            .write_object("bucket", "key", "null", small_data())
743            .await
744            .unwrap_or_else(|e| panic!("write failed: {e}"));
745
746        storage.delete_object("bucket", "key", "null");
747
748        let result = storage.read_object("bucket", "key", "null", None).await;
749        assert!(matches!(result, Err(S3ServiceError::NoSuchKey { .. })));
750    }
751
752    #[tokio::test]
753    async fn test_should_not_panic_on_delete_nonexistent() {
754        let storage = InMemoryStorage::new(TEST_THRESHOLD);
755        // Should be a no-op, not panic.
756        storage.delete_object("bucket", "ghost", "null");
757    }
758
759    // -----------------------------------------------------------------------
760    // Multipart write / read / complete / abort
761    // -----------------------------------------------------------------------
762
763    #[tokio::test]
764    async fn test_should_write_and_read_part() {
765        let storage = InMemoryStorage::new(TEST_THRESHOLD);
766        let data = Bytes::from("part-data");
767        let wr = storage
768            .write_part("bucket", "upload-1", 1, data.clone())
769            .await
770            .unwrap_or_else(|e| panic!("write_part failed: {e}"));
771
772        assert_eq!(wr.size, data.len() as u64);
773
774        let read = storage
775            .read_part("bucket", "upload-1", 1)
776            .await
777            .unwrap_or_else(|e| panic!("read_part failed: {e}"));
778        assert_eq!(read, data);
779    }
780
781    #[tokio::test]
782    async fn test_should_return_error_on_read_missing_part() {
783        let storage = InMemoryStorage::new(TEST_THRESHOLD);
784        let result = storage.read_part("bucket", "upload-1", 99).await;
785        assert!(matches!(result, Err(S3ServiceError::InvalidPart)));
786    }
787
788    #[tokio::test]
789    async fn test_should_complete_multipart_upload() {
790        let storage = InMemoryStorage::new(TEST_THRESHOLD);
791
792        let part1 = Bytes::from("hello ");
793        let part2 = Bytes::from("world");
794
795        storage
796            .write_part("bucket", "upload-1", 1, part1.clone())
797            .await
798            .unwrap_or_else(|e| panic!("write part 1 failed: {e}"));
799        storage
800            .write_part("bucket", "upload-1", 2, part2.clone())
801            .await
802            .unwrap_or_else(|e| panic!("write part 2 failed: {e}"));
803
804        let (wr, part_md5s) = storage
805            .complete_multipart("bucket", "upload-1", "assembled-key", "null", &[1, 2])
806            .await
807            .unwrap_or_else(|e| panic!("complete_multipart failed: {e}"));
808
809        // Size should be the sum of parts.
810        assert_eq!(wr.size, (part1.len() + part2.len()) as u64);
811
812        // ETag should be a composite (contains "-2").
813        assert!(
814            wr.etag.contains("-2"),
815            "expected composite ETag, got {}",
816            wr.etag
817        );
818
819        // Part MD5s should have 2 entries.
820        assert_eq!(part_md5s.len(), 2);
821        assert_eq!(part_md5s[0], checksums::compute_md5(&part1));
822        assert_eq!(part_md5s[1], checksums::compute_md5(&part2));
823
824        // The assembled object should be readable.
825        let data = storage
826            .read_object("bucket", "assembled-key", "null", None)
827            .await
828            .unwrap_or_else(|e| panic!("read assembled object failed: {e}"));
829        assert_eq!(data.as_ref(), b"hello world");
830
831        // Parts should have been cleaned up.
832        let part_read = storage.read_part("bucket", "upload-1", 1).await;
833        assert!(
834            matches!(part_read, Err(S3ServiceError::InvalidPart)),
835            "parts should be cleaned up after complete"
836        );
837    }
838
839    #[tokio::test]
840    async fn test_should_return_error_on_complete_with_missing_part() {
841        let storage = InMemoryStorage::new(TEST_THRESHOLD);
842        storage
843            .write_part("bucket", "upload-1", 1, Bytes::from("data"))
844            .await
845            .unwrap_or_else(|e| panic!("write part failed: {e}"));
846
847        // Part 2 was never uploaded.
848        let result = storage
849            .complete_multipart("bucket", "upload-1", "key", "null", &[1, 2])
850            .await;
851        assert!(matches!(result, Err(S3ServiceError::InvalidPart)));
852    }
853
854    #[tokio::test]
855    async fn test_should_abort_multipart() {
856        let storage = InMemoryStorage::new(TEST_THRESHOLD);
857        storage
858            .write_part("bucket", "upload-1", 1, Bytes::from("a"))
859            .await
860            .unwrap_or_else(|e| panic!("write part 1 failed: {e}"));
861        storage
862            .write_part("bucket", "upload-1", 2, Bytes::from("b"))
863            .await
864            .unwrap_or_else(|e| panic!("write part 2 failed: {e}"));
865
866        // Also write a part for a different upload to verify isolation.
867        storage
868            .write_part("bucket", "upload-2", 1, Bytes::from("c"))
869            .await
870            .unwrap_or_else(|e| panic!("write part for upload-2 failed: {e}"));
871
872        storage.abort_multipart("bucket", "upload-1");
873
874        // upload-1 parts should be gone.
875        assert!(matches!(
876            storage.read_part("bucket", "upload-1", 1).await,
877            Err(S3ServiceError::InvalidPart)
878        ));
879        assert!(matches!(
880            storage.read_part("bucket", "upload-1", 2).await,
881            Err(S3ServiceError::InvalidPart)
882        ));
883
884        // upload-2 part should still exist.
885        let data = storage
886            .read_part("bucket", "upload-2", 1)
887            .await
888            .unwrap_or_else(|e| panic!("read part for upload-2 failed: {e}"));
889        assert_eq!(data.as_ref(), b"c");
890    }
891
892    // -----------------------------------------------------------------------
893    // Delete bucket data
894    // -----------------------------------------------------------------------
895
896    #[tokio::test]
897    async fn test_should_delete_bucket_data() {
898        let storage = InMemoryStorage::new(TEST_THRESHOLD);
899        storage
900            .write_object("target", "obj1", "null", Bytes::from("a"))
901            .await
902            .unwrap_or_else(|e| panic!("write obj1 failed: {e}"));
903        storage
904            .write_object("target", "obj2", "null", Bytes::from("b"))
905            .await
906            .unwrap_or_else(|e| panic!("write obj2 failed: {e}"));
907        storage
908            .write_part("target", "upload-1", 1, Bytes::from("p"))
909            .await
910            .unwrap_or_else(|e| panic!("write part failed: {e}"));
911
912        // Also write to a different bucket.
913        storage
914            .write_object("other", "obj3", "null", Bytes::from("c"))
915            .await
916            .unwrap_or_else(|e| panic!("write obj3 failed: {e}"));
917
918        storage.delete_bucket_data("target");
919
920        // Target bucket data should be gone.
921        assert!(matches!(
922            storage.read_object("target", "obj1", "null", None).await,
923            Err(S3ServiceError::NoSuchKey { .. })
924        ));
925        assert!(matches!(
926            storage.read_object("target", "obj2", "null", None).await,
927            Err(S3ServiceError::NoSuchKey { .. })
928        ));
929        assert!(matches!(
930            storage.read_part("target", "upload-1", 1).await,
931            Err(S3ServiceError::InvalidPart)
932        ));
933
934        // Other bucket should be untouched.
935        let data = storage
936            .read_object("other", "obj3", "null", None)
937            .await
938            .unwrap_or_else(|e| panic!("read obj3 failed: {e}"));
939        assert_eq!(data.as_ref(), b"c");
940    }
941
942    // -----------------------------------------------------------------------
943    // Reset
944    // -----------------------------------------------------------------------
945
946    #[tokio::test]
947    async fn test_should_reset_all_storage() {
948        let storage = InMemoryStorage::new(TEST_THRESHOLD);
949        storage
950            .write_object("b1", "k1", "null", Bytes::from("data1"))
951            .await
952            .unwrap_or_else(|e| panic!("write1 failed: {e}"));
953        storage
954            .write_object("b2", "k2", "null", Bytes::from("data2"))
955            .await
956            .unwrap_or_else(|e| panic!("write2 failed: {e}"));
957        storage
958            .write_part("b1", "upload", 1, Bytes::from("part"))
959            .await
960            .unwrap_or_else(|e| panic!("write part failed: {e}"));
961
962        storage.reset();
963
964        assert!(matches!(
965            storage.read_object("b1", "k1", "null", None).await,
966            Err(S3ServiceError::NoSuchKey { .. })
967        ));
968        assert!(matches!(
969            storage.read_object("b2", "k2", "null", None).await,
970            Err(S3ServiceError::NoSuchKey { .. })
971        ));
972        assert!(matches!(
973            storage.read_part("b1", "upload", 1).await,
974            Err(S3ServiceError::InvalidPart)
975        ));
976    }
977
978    // -----------------------------------------------------------------------
979    // Default and Debug impls
980    // -----------------------------------------------------------------------
981
982    #[test]
983    fn test_should_create_default_storage() {
984        let storage = InMemoryStorage::default();
985        assert_eq!(
986            InMemoryStorage::default_max_memory_size(),
987            DEFAULT_MAX_MEMORY_SIZE
988        );
989        let debug_str = format!("{storage:?}");
990        assert!(debug_str.contains("InMemoryStorage"));
991    }
992
993    // -----------------------------------------------------------------------
994    // On-disk cleanup on overwrite
995    // -----------------------------------------------------------------------
996
997    #[tokio::test]
998    async fn test_should_clean_up_on_overwrite() {
999        let storage = InMemoryStorage::new(TEST_THRESHOLD);
1000        let data1 = large_data();
1001
1002        storage
1003            .write_object("bucket", "key", "null", data1)
1004            .await
1005            .unwrap_or_else(|e| panic!("write1 failed: {e}"));
1006
1007        // Overwrite with new data. The old temp file should be cleaned up
1008        // via Drop when the DashMap entry is replaced.
1009        let data2 = Bytes::from("small");
1010        storage
1011            .write_object("bucket", "key", "null", data2.clone())
1012            .await
1013            .unwrap_or_else(|e| panic!("write2 failed: {e}"));
1014
1015        let read = storage
1016            .read_object("bucket", "key", "null", None)
1017            .await
1018            .unwrap_or_else(|e| panic!("read failed: {e}"));
1019        assert_eq!(read, data2);
1020    }
1021
1022    // -----------------------------------------------------------------------
1023    // Large part spillover
1024    // -----------------------------------------------------------------------
1025
1026    #[tokio::test]
1027    async fn test_should_write_and_read_large_part_on_disk() {
1028        let storage = InMemoryStorage::new(TEST_THRESHOLD);
1029        let data = large_data();
1030
1031        let wr = storage
1032            .write_part("bucket", "upload-big", 1, data.clone())
1033            .await
1034            .unwrap_or_else(|e| panic!("write_part failed: {e}"));
1035        assert_eq!(wr.size, data.len() as u64);
1036
1037        let read = storage
1038            .read_part("bucket", "upload-big", 1)
1039            .await
1040            .unwrap_or_else(|e| panic!("read_part failed: {e}"));
1041        assert_eq!(read, data);
1042    }
1043
1044    // -----------------------------------------------------------------------
1045    // Large on-disk object read completeness
1046    // -----------------------------------------------------------------------
1047
1048    #[tokio::test]
1049    async fn test_should_read_all_bytes_from_large_on_disk_object() {
1050        // Use a 1 MiB object with a 64-byte threshold to force disk spillover.
1051        // This verifies that read_all returns the complete data rather than
1052        // only the bytes returned by a single read syscall.
1053        const SIZE: usize = 1024 * 1024;
1054        let storage = InMemoryStorage::new(TEST_THRESHOLD);
1055        let data = Bytes::from(vec![0x42_u8; SIZE]);
1056
1057        storage
1058            .write_object("bucket", "big-obj", "null", data.clone())
1059            .await
1060            .unwrap_or_else(|e| panic!("write failed: {e}"));
1061
1062        let read_data = storage
1063            .read_object("bucket", "big-obj", "null", None)
1064            .await
1065            .unwrap_or_else(|e| panic!("read failed: {e}"));
1066
1067        assert_eq!(
1068            read_data.len(),
1069            SIZE,
1070            "expected {SIZE} bytes, got {}",
1071            read_data.len()
1072        );
1073        assert_eq!(read_data, data);
1074    }
1075
1076    // -----------------------------------------------------------------------
1077    // Read nonexistent object
1078    // -----------------------------------------------------------------------
1079
1080    #[tokio::test]
1081    async fn test_should_return_error_on_read_nonexistent_object() {
1082        let storage = InMemoryStorage::new(TEST_THRESHOLD);
1083        let result = storage.read_object("bucket", "ghost", "null", None).await;
1084        assert!(matches!(result, Err(S3ServiceError::NoSuchKey { .. })));
1085    }
1086}