Skip to main content

fakecloud_persistence/
s3.rs

1use std::collections::{BTreeMap, HashMap};
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11    #[serde(default)]
12    pub account_id: String,
13    #[serde(default)]
14    pub region: String,
15    #[serde(default)]
16    pub buckets: HashMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21    pub meta: BucketMeta,
22    #[serde(default)]
23    pub objects: HashMap<String, LoadedObject>,
24    #[serde(default)]
25    pub object_versions: HashMap<String, Vec<LoadedObject>>,
26    #[serde(default)]
27    pub subresources: HashMap<String, String>,
28    #[serde(default)]
29    pub multipart_uploads: HashMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34    pub init: MpuInit,
35    #[serde(default)]
36    pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41    pub meta: UploadPartMeta,
42    #[serde(default)]
43    pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48    pub meta: ObjectMeta,
49    #[serde(default)]
50    pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55    #[serde(default)]
56    pub name: String,
57    #[serde(default = "default_time")]
58    pub creation_date: DateTime<Utc>,
59    #[serde(default)]
60    pub region: String,
61    #[serde(default)]
62    pub versioning: Option<String>,
63    #[serde(default)]
64    pub acl: Option<String>,
65    #[serde(default)]
66    pub acl_owner_id: String,
67    #[serde(default)]
68    pub accelerate_status: Option<String>,
69    #[serde(default)]
70    pub eventbridge_enabled: bool,
71}
72
73fn default_time() -> DateTime<Utc> {
74    DateTime::<Utc>::MIN_UTC
75}
76
77#[derive(Debug, Default, Clone, Serialize, Deserialize)]
78pub struct AclGrantSnapshot {
79    pub grantee_type: String,
80    #[serde(default)]
81    pub grantee_id: Option<String>,
82    #[serde(default)]
83    pub grantee_display_name: Option<String>,
84    #[serde(default)]
85    pub grantee_uri: Option<String>,
86    pub permission: String,
87}
88
89#[derive(Debug, Default, Clone, Serialize, Deserialize)]
90pub struct TagsSnapshot {
91    #[serde(default)]
92    pub tags: HashMap<String, String>,
93}
94
95#[derive(Debug, Default, Clone, Serialize, Deserialize)]
96pub struct AclSnapshot {
97    #[serde(default)]
98    pub owner_id: String,
99    #[serde(default)]
100    pub grants: Vec<AclGrantSnapshot>,
101}
102
103#[derive(Debug, Default, Clone, Serialize, Deserialize)]
104pub struct InventorySnapshot {
105    #[serde(default)]
106    pub configs: HashMap<String, String>,
107}
108
109#[derive(Debug, Default, Clone, Serialize, Deserialize)]
110pub struct ObjectMeta {
111    #[serde(default)]
112    pub key: String,
113    #[serde(default)]
114    pub content_type: String,
115    #[serde(default)]
116    pub etag: String,
117    #[serde(default)]
118    pub size: u64,
119    #[serde(default = "default_time")]
120    pub last_modified: DateTime<Utc>,
121    #[serde(default)]
122    pub metadata: HashMap<String, String>,
123    #[serde(default)]
124    pub tags: HashMap<String, String>,
125    #[serde(default)]
126    pub storage_class: String,
127    #[serde(default)]
128    pub acl_grants: Vec<AclGrantSnapshot>,
129    #[serde(default)]
130    pub acl_owner_id: Option<String>,
131    #[serde(default)]
132    pub parts_count: Option<u32>,
133    #[serde(default)]
134    pub part_sizes: Option<Vec<(u32, u64)>>,
135    #[serde(default)]
136    pub sse_algorithm: Option<String>,
137    #[serde(default)]
138    pub sse_kms_key_id: Option<String>,
139    #[serde(default)]
140    pub bucket_key_enabled: Option<bool>,
141    #[serde(default)]
142    pub version_id: Option<String>,
143    #[serde(default)]
144    pub is_delete_marker: bool,
145    #[serde(default)]
146    pub restore_ongoing: Option<bool>,
147    #[serde(default)]
148    pub restore_expiry: Option<String>,
149    #[serde(default)]
150    pub checksum_algorithm: Option<String>,
151    #[serde(default)]
152    pub checksum_value: Option<String>,
153    #[serde(default)]
154    pub lock_mode: Option<String>,
155    #[serde(default)]
156    pub lock_retain_until: Option<DateTime<Utc>>,
157    #[serde(default)]
158    pub lock_legal_hold: Option<String>,
159    #[serde(default)]
160    pub content_encoding: Option<String>,
161    #[serde(default)]
162    pub website_redirect_location: Option<String>,
163}
164
165#[derive(Debug, Default, Clone, Serialize, Deserialize)]
166pub struct MpuInit {
167    pub upload_id: String,
168    pub key: String,
169    #[serde(default = "default_time")]
170    pub initiated: DateTime<Utc>,
171    #[serde(default)]
172    pub metadata: HashMap<String, String>,
173    #[serde(default)]
174    pub content_type: String,
175    #[serde(default)]
176    pub storage_class: String,
177    #[serde(default)]
178    pub sse_algorithm: Option<String>,
179    #[serde(default)]
180    pub sse_kms_key_id: Option<String>,
181    #[serde(default)]
182    pub tagging: Option<String>,
183    #[serde(default)]
184    pub acl_grants: Vec<AclGrantSnapshot>,
185    #[serde(default)]
186    pub checksum_algorithm: Option<String>,
187}
188
189#[derive(Debug, Default, Clone, Serialize, Deserialize)]
190pub struct UploadPartMeta {
191    pub part_number: u32,
192    pub etag: String,
193    pub size: u64,
194    #[serde(default = "default_time")]
195    pub last_modified: DateTime<Utc>,
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub enum BucketSubresource {
200    Tags,
201    Lifecycle,
202    Cors,
203    Policy,
204    Notification,
205    Logging,
206    Website,
207    PublicAccessBlock,
208    ObjectLock,
209    Replication,
210    Ownership,
211    Inventory,
212    Encryption,
213    Versioning,
214    Acl,
215    Accelerate,
216}
217
218pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
219    BucketSubresource::Tags,
220    BucketSubresource::Lifecycle,
221    BucketSubresource::Cors,
222    BucketSubresource::Policy,
223    BucketSubresource::Notification,
224    BucketSubresource::Logging,
225    BucketSubresource::Website,
226    BucketSubresource::PublicAccessBlock,
227    BucketSubresource::ObjectLock,
228    BucketSubresource::Replication,
229    BucketSubresource::Ownership,
230    BucketSubresource::Inventory,
231    BucketSubresource::Encryption,
232    BucketSubresource::Versioning,
233    BucketSubresource::Acl,
234    BucketSubresource::Accelerate,
235];
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub enum BodyRef {
239    #[serde(skip)]
240    Memory(Bytes),
241    Disk {
242        bucket: String,
243        key: String,
244        #[serde(default)]
245        version: Option<String>,
246        path: PathBuf,
247        size: u64,
248    },
249}
250
251impl BodyRef {
252    pub fn size(&self) -> u64 {
253        match self {
254            BodyRef::Memory(b) => b.len() as u64,
255            BodyRef::Disk { size, .. } => *size,
256        }
257    }
258}
259
260impl Default for BodyRef {
261    fn default() -> Self {
262        BodyRef::Memory(Bytes::new())
263    }
264}
265
266#[derive(Debug)]
267pub enum BodySource {
268    Bytes(Bytes),
269    /// Existing disk path that should be *moved* into the destination via
270    /// rename (upload-tmp → final) and consumed.
271    File(PathBuf),
272    /// Existing disk path that should be *copied* into the destination and
273    /// left in place. Used by replication so the source object stays
274    /// available while the replica is produced.
275    FileCopy(PathBuf),
276}
277
278#[derive(Debug, Error)]
279pub enum StoreError {
280    #[error("io error: {0}")]
281    Io(#[from] std::io::Error),
282    #[error("serialization error: {0}")]
283    Serde(String),
284    #[error("not supported by this store")]
285    NotSupported,
286    #[error("{0}")]
287    Other(String),
288}
289
290pub type StoreResult<T> = Result<T, StoreError>;
291
292pub trait S3Store: Send + Sync {
293    fn load(&self) -> StoreResult<S3State>;
294
295    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
296    fn put_bucket_subresource(
297        &self,
298        bucket: &str,
299        kind: BucketSubresource,
300        payload: &str,
301    ) -> StoreResult<()>;
302    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
303    fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
304
305    fn put_object(
306        &self,
307        bucket: &str,
308        key: &str,
309        version: Option<&str>,
310        body: BodySource,
311        meta: &ObjectMeta,
312    ) -> StoreResult<BodyRef>;
313    fn put_object_meta(
314        &self,
315        bucket: &str,
316        key: &str,
317        version: Option<&str>,
318        meta: &ObjectMeta,
319    ) -> StoreResult<()>;
320    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
321    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
322
323    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
324    fn mpu_put_part(
325        &self,
326        bucket: &str,
327        upload_id: &str,
328        part_number: u32,
329        body: BodySource,
330        etag: &str,
331    ) -> StoreResult<()>;
332    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
333    fn mpu_complete(
334        &self,
335        bucket: &str,
336        upload_id: &str,
337        final_key: &str,
338        version: Option<&str>,
339        meta: &ObjectMeta,
340    ) -> StoreResult<BodyRef>;
341}
342
343pub struct MemoryS3Store;
344
345impl MemoryS3Store {
346    pub fn new() -> Self {
347        Self
348    }
349}
350
351impl Default for MemoryS3Store {
352    fn default() -> Self {
353        Self::new()
354    }
355}
356
357impl S3Store for MemoryS3Store {
358    fn load(&self) -> StoreResult<S3State> {
359        Ok(S3State::default())
360    }
361
362    fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
363        Ok(())
364    }
365    fn put_bucket_subresource(
366        &self,
367        _bucket: &str,
368        _kind: BucketSubresource,
369        _payload: &str,
370    ) -> StoreResult<()> {
371        Ok(())
372    }
373    fn delete_bucket_subresource(
374        &self,
375        _bucket: &str,
376        _kind: BucketSubresource,
377    ) -> StoreResult<()> {
378        Ok(())
379    }
380    fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
381        Ok(())
382    }
383
384    fn put_object(
385        &self,
386        _bucket: &str,
387        _key: &str,
388        _version: Option<&str>,
389        body: BodySource,
390        _meta: &ObjectMeta,
391    ) -> StoreResult<BodyRef> {
392        match body {
393            BodySource::Bytes(b) => Ok(BodyRef::Memory(b)),
394            BodySource::File(_) | BodySource::FileCopy(_) => Err(StoreError::Other(
395                "file-backed put not supported in memory mode".to_string(),
396            )),
397        }
398    }
399    fn put_object_meta(
400        &self,
401        _bucket: &str,
402        _key: &str,
403        _version: Option<&str>,
404        _meta: &ObjectMeta,
405    ) -> StoreResult<()> {
406        Ok(())
407    }
408    fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
409        Ok(())
410    }
411    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
412        match body {
413            BodyRef::Memory(b) => Ok(b.clone()),
414            BodyRef::Disk { .. } => {
415                panic!("MemoryS3Store cannot open Disk-backed BodyRef")
416            }
417        }
418    }
419
420    fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
421        Ok(())
422    }
423    fn mpu_put_part(
424        &self,
425        _bucket: &str,
426        _upload_id: &str,
427        _part_number: u32,
428        _body: BodySource,
429        _etag: &str,
430    ) -> StoreResult<()> {
431        Ok(())
432    }
433    fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
434        Ok(())
435    }
436    fn mpu_complete(
437        &self,
438        _bucket: &str,
439        _upload_id: &str,
440        _final_key: &str,
441        _version: Option<&str>,
442        _meta: &ObjectMeta,
443    ) -> StoreResult<BodyRef> {
444        Ok(BodyRef::Memory(Bytes::new()))
445    }
446}
447
448pub struct DiskS3Store {
449    root: PathBuf,
450    cache: std::sync::Arc<crate::cache::BodyCache>,
451}
452
453impl DiskS3Store {
454    pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
455        Self { root, cache }
456    }
457
458    fn buckets_dir(&self) -> PathBuf {
459        self.root.join("buckets")
460    }
461
462    fn bucket_dir(&self, bucket: &str) -> PathBuf {
463        self.buckets_dir()
464            .join(crate::key_escape::escape_key_segment(bucket))
465    }
466
467    fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
468        self.bucket_dir(bucket)
469            .join("objects")
470            .join(crate::key_escape::escape_key_segment(key))
471    }
472
473    fn version_tag(version: Option<&str>) -> String {
474        version.unwrap_or("null").to_string()
475    }
476
477    fn object_paths(
478        &self,
479        bucket: &str,
480        key: &str,
481        version: Option<&str>,
482    ) -> (PathBuf, PathBuf, PathBuf) {
483        let dir = self.object_dir(bucket, key);
484        let tag = Self::version_tag(version);
485        let bin = dir.join(format!("{}.bin", tag));
486        let toml = dir.join(format!("{}.toml", tag));
487        (dir, bin, toml)
488    }
489
490    fn subresource_filename(kind: BucketSubresource) -> &'static str {
491        match kind {
492            BucketSubresource::Tags => "tags.toml",
493            BucketSubresource::Lifecycle => "lifecycle.toml",
494            BucketSubresource::Cors => "cors.toml",
495            BucketSubresource::Policy => "policy.toml",
496            BucketSubresource::Notification => "notification.toml",
497            BucketSubresource::Logging => "logging.toml",
498            BucketSubresource::Website => "website.toml",
499            BucketSubresource::PublicAccessBlock => "public_access_block.toml",
500            BucketSubresource::ObjectLock => "object_lock.toml",
501            BucketSubresource::Replication => "replication.toml",
502            BucketSubresource::Ownership => "ownership.toml",
503            BucketSubresource::Inventory => "inventory.toml",
504            BucketSubresource::Encryption => "encryption.toml",
505            BucketSubresource::Versioning => "versioning.toml",
506            BucketSubresource::Acl => "acl.toml",
507            BucketSubresource::Accelerate => "accelerate.toml",
508        }
509    }
510
511    fn cleanup_empty(dir: &std::path::Path) {
512        let _ = std::fs::remove_dir(dir);
513    }
514
515    fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
516        self.bucket_dir(bucket)
517            .join("mpu")
518            .join(crate::key_escape::escape_key_segment(upload_id))
519    }
520
521    fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
522        self.mpu_dir(bucket, upload_id).join("parts")
523    }
524
525    fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
526        self.mpu_parts_dir(bucket, upload_id)
527            .join(format!("{}.bin", part_number))
528    }
529
530    fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
531        self.mpu_parts_dir(bucket, upload_id)
532            .join(format!("{}.toml", part_number))
533    }
534
535    fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
536        crate::cache::BodyKey::new(
537            bucket.to_string(),
538            format!("__mpu__/{}", upload_id),
539            Some(format!("part-{}", part_number)),
540        )
541    }
542}
543
544fn io_other(msg: impl Into<String>) -> StoreError {
545    StoreError::Other(msg.into())
546}
547
548impl S3Store for DiskS3Store {
549    fn load(&self) -> StoreResult<S3State> {
550        let mut state = S3State::default();
551        let buckets_dir = self.buckets_dir();
552        if !buckets_dir.exists() {
553            return Ok(state);
554        }
555        for entry in std::fs::read_dir(&buckets_dir)? {
556            let entry = entry?;
557            if !entry.file_type()?.is_dir() {
558                continue;
559            }
560            let bdir = entry.path();
561            let meta_path = bdir.join("meta.toml");
562            if !meta_path.exists() {
563                continue;
564            }
565            let meta_text = std::fs::read_to_string(&meta_path)?;
566            let mut meta: BucketMeta =
567                toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
568            let mut snap = BucketSnapshot {
569                meta: meta.clone(),
570                objects: HashMap::new(),
571                object_versions: HashMap::new(),
572                subresources: HashMap::new(),
573                multipart_uploads: HashMap::new(),
574            };
575
576            for kind in ALL_SUBRESOURCES {
577                let fname = Self::subresource_filename(*kind);
578                let path = bdir.join(fname);
579                if path.exists() {
580                    let text = std::fs::read_to_string(&path)?;
581                    if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none() {
582                        let stripped = text.trim();
583                        if !stripped.is_empty() {
584                            snap.meta.versioning = Some(stripped.to_string());
585                            meta.versioning = snap.meta.versioning.clone();
586                        }
587                    }
588                    snap.subresources.insert(fname.to_string(), text);
589                }
590            }
591
592            let objects_root = bdir.join("objects");
593            if objects_root.exists() {
594                for okey_entry in std::fs::read_dir(&objects_root)? {
595                    let okey_entry = okey_entry?;
596                    if !okey_entry.file_type()?.is_dir() {
597                        continue;
598                    }
599                    let key_dir = okey_entry.path();
600                    let mut versioned: Vec<LoadedObject> = Vec::new();
601                    let mut key_name: Option<String> = None;
602                    for version_entry in std::fs::read_dir(&key_dir)? {
603                        let version_entry = version_entry?;
604                        let path = version_entry.path();
605                        let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
606                            continue;
607                        };
608                        if !fname.ends_with(".toml") {
609                            continue;
610                        }
611                        let version_tag = &fname[..fname.len() - 5];
612                        let toml_text = std::fs::read_to_string(&path)?;
613                        let obj_meta: ObjectMeta = toml::from_str(&toml_text)
614                            .map_err(|e| StoreError::Serde(e.to_string()))?;
615                        let bin_path = key_dir.join(format!("{}.bin", version_tag));
616                        let (body, size) = if obj_meta.is_delete_marker {
617                            (BodyRef::Memory(Bytes::new()), 0u64)
618                        } else if bin_path.exists() {
619                            let sz = std::fs::metadata(&bin_path)?.len();
620                            (
621                                BodyRef::Disk {
622                                    bucket: meta.name.clone(),
623                                    key: obj_meta.key.clone(),
624                                    version: if version_tag == "null" {
625                                        None
626                                    } else {
627                                        Some(version_tag.to_string())
628                                    },
629                                    path: bin_path,
630                                    size: sz,
631                                },
632                                sz,
633                            )
634                        } else {
635                            // Fail loud: the sidecar says this object has a
636                            // body but the .bin file is missing. Returning
637                            // silently would hand the caller a truncated
638                            // object and hide data loss.
639                            return Err(StoreError::Other(format!(
640                                "missing body file: {}",
641                                bin_path.display()
642                            )));
643                        };
644                        let _ = size;
645                        key_name.get_or_insert_with(|| obj_meta.key.clone());
646                        if version_tag == "null" && obj_meta.version_id.is_none() {
647                            snap.objects.insert(
648                                obj_meta.key.clone(),
649                                LoadedObject {
650                                    meta: obj_meta,
651                                    body,
652                                },
653                            );
654                        } else {
655                            versioned.push(LoadedObject {
656                                meta: obj_meta,
657                                body,
658                            });
659                        }
660                    }
661                    if !versioned.is_empty() {
662                        versioned.sort_by_key(|v| v.meta.last_modified);
663                        if let Some(key) = key_name {
664                            // Reconcile snap.objects with the newest version:
665                            // a trailing delete marker hides any prior null or
666                            // live version (remove it); otherwise overwrite
667                            // with the newest live version, even if a
668                            // pre-versioning null.toml had already been
669                            // inserted during the non-versioned scan.
670                            match versioned.last() {
671                                Some(newest) if newest.meta.is_delete_marker => {
672                                    snap.objects.remove(&key);
673                                }
674                                Some(newest) => {
675                                    snap.objects.insert(key.clone(), newest.clone());
676                                }
677                                None => {}
678                            }
679                            snap.object_versions.insert(key, versioned);
680                        }
681                    }
682                }
683            }
684
685            let mpu_root = bdir.join("mpu");
686            if mpu_root.exists() {
687                for upload_entry in std::fs::read_dir(&mpu_root)? {
688                    let upload_entry = upload_entry?;
689                    if !upload_entry.file_type()?.is_dir() {
690                        continue;
691                    }
692                    let upload_dir = upload_entry.path();
693                    let init_path = upload_dir.join("init.toml");
694                    if !init_path.exists() {
695                        continue;
696                    }
697                    let init_text = std::fs::read_to_string(&init_path)?;
698                    let init: MpuInit =
699                        toml::from_str(&init_text).map_err(|e| StoreError::Serde(e.to_string()))?;
700                    let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
701                    let parts_dir = upload_dir.join("parts");
702                    if parts_dir.exists() {
703                        for part_entry in std::fs::read_dir(&parts_dir)? {
704                            let part_entry = part_entry?;
705                            let path = part_entry.path();
706                            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
707                                continue;
708                            };
709                            if !fname.ends_with(".toml") {
710                                continue;
711                            }
712                            let stem = &fname[..fname.len() - 5];
713                            let Ok(part_number) = stem.parse::<u32>() else {
714                                continue;
715                            };
716                            let toml_text = std::fs::read_to_string(&path)?;
717                            let part_meta: UploadPartMeta = toml::from_str(&toml_text)
718                                .map_err(|e| StoreError::Serde(e.to_string()))?;
719                            let bin_path = parts_dir.join(format!("{}.bin", part_number));
720                            if !bin_path.exists() {
721                                return Err(StoreError::Other(format!(
722                                    "missing multipart part body file: {}",
723                                    bin_path.display()
724                                )));
725                            }
726                            let sz = std::fs::metadata(&bin_path)?.len();
727                            let body = BodyRef::Disk {
728                                bucket: meta.name.clone(),
729                                key: format!("__mpu__/{}", init.upload_id),
730                                version: Some(format!("part-{}", part_number)),
731                                path: bin_path,
732                                size: sz,
733                            };
734                            loaded_parts.insert(
735                                part_number,
736                                LoadedPart {
737                                    meta: part_meta,
738                                    body,
739                                },
740                            );
741                        }
742                    }
743                    snap.multipart_uploads.insert(
744                        init.upload_id.clone(),
745                        LoadedMpu {
746                            init,
747                            parts: loaded_parts,
748                        },
749                    );
750                }
751            }
752
753            state.buckets.insert(meta.name.clone(), snap);
754        }
755        Ok(state)
756    }
757
758    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
759        let dir = self.bucket_dir(bucket);
760        std::fs::create_dir_all(&dir)?;
761        crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
762        Ok(())
763    }
764
765    fn put_bucket_subresource(
766        &self,
767        bucket: &str,
768        kind: BucketSubresource,
769        payload: &str,
770    ) -> StoreResult<()> {
771        let dir = self.bucket_dir(bucket);
772        std::fs::create_dir_all(&dir)?;
773        let path = dir.join(Self::subresource_filename(kind));
774        crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
775        Ok(())
776    }
777
778    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
779        let path = self
780            .bucket_dir(bucket)
781            .join(Self::subresource_filename(kind));
782        match std::fs::remove_file(&path) {
783            Ok(_) => Ok(()),
784            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
785            Err(e) => Err(e.into()),
786        }
787    }
788
789    fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
790        let dir = self.bucket_dir(bucket);
791        match std::fs::remove_dir_all(&dir) {
792            Ok(_) => Ok(()),
793            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
794            Err(e) => Err(e.into()),
795        }
796    }
797
798    fn put_object(
799        &self,
800        bucket: &str,
801        key: &str,
802        version: Option<&str>,
803        body: BodySource,
804        meta: &ObjectMeta,
805    ) -> StoreResult<BodyRef> {
806        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
807        std::fs::create_dir_all(&dir)?;
808
809        if meta.is_delete_marker {
810            crate::atomic::write_atomic_toml(&toml_path, meta)?;
811            return Ok(BodyRef::Memory(Bytes::new()));
812        }
813
814        let size: u64;
815        let bytes_for_cache: Option<Bytes>;
816        match body {
817            BodySource::Bytes(b) => {
818                size = b.len() as u64;
819                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
820                bytes_for_cache = Some(b);
821            }
822            BodySource::File(src) => {
823                let src_size = std::fs::metadata(&src)?.len();
824                size = src_size;
825                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
826                bytes_for_cache = None;
827            }
828            BodySource::FileCopy(src) => {
829                let src_size = std::fs::metadata(&src)?.len();
830                size = src_size;
831                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
832                bytes_for_cache = None;
833            }
834        }
835
836        crate::atomic::write_atomic_toml(&toml_path, meta)?;
837
838        let body_key = crate::cache::BodyKey::new(
839            bucket.to_string(),
840            key.to_string(),
841            version.map(|s| s.to_string()),
842        );
843        if let Some(b) = bytes_for_cache {
844            self.cache.insert(body_key, b);
845        } else {
846            self.cache.invalidate(&crate::cache::BodyKey::new(
847                bucket.to_string(),
848                key.to_string(),
849                version.map(|s| s.to_string()),
850            ));
851        }
852
853        Ok(BodyRef::Disk {
854            bucket: bucket.to_string(),
855            key: key.to_string(),
856            version: version.map(|s| s.to_string()),
857            path: bin_path,
858            size,
859        })
860    }
861
862    fn put_object_meta(
863        &self,
864        bucket: &str,
865        key: &str,
866        version: Option<&str>,
867        meta: &ObjectMeta,
868    ) -> StoreResult<()> {
869        let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
870        std::fs::create_dir_all(&dir)?;
871        crate::atomic::write_atomic_toml(&toml_path, meta)?;
872        Ok(())
873    }
874
875    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
876        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
877        for p in [&bin_path, &toml_path] {
878            match std::fs::remove_file(p) {
879                Ok(_) => {}
880                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
881                Err(e) => return Err(e.into()),
882            }
883        }
884        Self::cleanup_empty(&dir);
885
886        self.cache.invalidate(&crate::cache::BodyKey::new(
887            bucket.to_string(),
888            key.to_string(),
889            version.map(|s| s.to_string()),
890        ));
891        Ok(())
892    }
893
894    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
895        match body {
896            BodyRef::Memory(b) => Ok(b.clone()),
897            BodyRef::Disk {
898                bucket,
899                key,
900                version,
901                path,
902                size: _,
903            } => {
904                let body_key =
905                    crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
906                if let Some(bytes) = self.cache.get(&body_key) {
907                    return Ok(bytes);
908                }
909                let bytes = Bytes::from(std::fs::read(path)?);
910                self.cache.insert(body_key, bytes.clone());
911                Ok(bytes)
912            }
913        }
914    }
915
916    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
917        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
918        std::fs::create_dir_all(&parts_dir)?;
919        let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
920        crate::atomic::write_atomic_toml(&init_path, init)?;
921        Ok(())
922    }
923
924    fn mpu_put_part(
925        &self,
926        bucket: &str,
927        upload_id: &str,
928        part_number: u32,
929        body: BodySource,
930        etag: &str,
931    ) -> StoreResult<()> {
932        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
933        std::fs::create_dir_all(&parts_dir)?;
934        let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
935        let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
936
937        let size: u64 = match body {
938            BodySource::Bytes(b) => {
939                let n = b.len() as u64;
940                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
941                let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
942                self.cache.insert(cache_key, b);
943                n
944            }
945            BodySource::File(src) => {
946                let n = std::fs::metadata(&src)?.len();
947                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
948                self.cache
949                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
950                n
951            }
952            BodySource::FileCopy(src) => {
953                let n = std::fs::metadata(&src)?.len();
954                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
955                self.cache
956                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
957                n
958            }
959        };
960
961        let meta = UploadPartMeta {
962            part_number,
963            etag: etag.to_string(),
964            size,
965            last_modified: Utc::now(),
966        };
967        crate::atomic::write_atomic_toml(&toml_path, &meta)?;
968        Ok(())
969    }
970
971    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
972        let dir = self.mpu_dir(bucket, upload_id);
973        // Invalidate any cached part bodies for this upload.
974        if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
975            for entry in entries.flatten() {
976                let path = entry.path();
977                if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
978                    if let Some(stem) = fname.strip_suffix(".bin") {
979                        if let Ok(n) = stem.parse::<u32>() {
980                            self.cache
981                                .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
982                        }
983                    }
984                }
985            }
986        }
987        match std::fs::remove_dir_all(&dir) {
988            Ok(_) => Ok(()),
989            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
990            Err(e) => Err(e.into()),
991        }
992    }
993
994    fn mpu_complete(
995        &self,
996        bucket: &str,
997        upload_id: &str,
998        final_key: &str,
999        version: Option<&str>,
1000        meta: &ObjectMeta,
1001    ) -> StoreResult<BodyRef> {
1002        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1003        if !parts_dir.exists() {
1004            return Err(io_other(format!(
1005                "mpu_complete: no parts dir for upload {}",
1006                upload_id
1007            )));
1008        }
1009
1010        // Enumerate parts in ascending part-number order.
1011        let mut part_numbers: Vec<u32> = Vec::new();
1012        for entry in std::fs::read_dir(&parts_dir)? {
1013            let entry = entry?;
1014            let path = entry.path();
1015            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1016                continue;
1017            };
1018            if let Some(stem) = fname.strip_suffix(".toml") {
1019                if let Ok(n) = stem.parse::<u32>() {
1020                    if self.mpu_part_bin(bucket, upload_id, n).exists() {
1021                        part_numbers.push(n);
1022                    }
1023                }
1024            }
1025        }
1026        part_numbers.sort_unstable();
1027        if part_numbers.is_empty() {
1028            return Err(io_other(format!(
1029                "mpu_complete: upload {} has no parts",
1030                upload_id
1031            )));
1032        }
1033
1034        let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1035        std::fs::create_dir_all(&dir)?;
1036
1037        let total_size: u64 = if part_numbers.len() == 1 {
1038            let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1039            let sz = std::fs::metadata(&only)?.len();
1040            match std::fs::rename(&only, &bin_path) {
1041                Ok(_) => {}
1042                Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1043                    // Cross-device rename: fall back to streaming copy then remove source.
1044                    {
1045                        let mut input = std::fs::File::open(&only)?;
1046                        let tmp = {
1047                            let mut os = bin_path.as_os_str().to_owned();
1048                            os.push(".tmp");
1049                            PathBuf::from(os)
1050                        };
1051                        let mut out = std::fs::OpenOptions::new()
1052                            .write(true)
1053                            .create(true)
1054                            .truncate(true)
1055                            .open(&tmp)?;
1056                        std::io::copy(&mut input, &mut out)?;
1057                        out.sync_all()?;
1058                        std::fs::rename(&tmp, &bin_path)?;
1059                    }
1060                    let _ = std::fs::remove_file(&only);
1061                }
1062                Err(e) => return Err(e.into()),
1063            }
1064            sz
1065        } else {
1066            let tmp = {
1067                let mut os = bin_path.as_os_str().to_owned();
1068                os.push(".tmp");
1069                PathBuf::from(os)
1070            };
1071            let mut total: u64 = 0;
1072            {
1073                let mut out = std::fs::OpenOptions::new()
1074                    .write(true)
1075                    .create(true)
1076                    .truncate(true)
1077                    .open(&tmp)?;
1078                for n in &part_numbers {
1079                    let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1080                    let mut input = std::fs::File::open(&part_path)?;
1081                    let copied = std::io::copy(&mut input, &mut out)?;
1082                    total += copied;
1083                }
1084                out.sync_all()?;
1085            }
1086            std::fs::rename(&tmp, &bin_path)?;
1087            if let Some(parent) = bin_path.parent() {
1088                if let Ok(dir_handle) = std::fs::File::open(parent) {
1089                    let _ = dir_handle.sync_all();
1090                }
1091            }
1092            total
1093        };
1094
1095        crate::atomic::write_atomic_toml(&toml_path, meta)?;
1096
1097        // Invalidate per-part cache entries and drop the mpu dir.
1098        for n in &part_numbers {
1099            self.cache
1100                .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1101        }
1102        let mpu_dir = self.mpu_dir(bucket, upload_id);
1103        let _ = std::fs::remove_dir_all(&mpu_dir);
1104
1105        // The concatenated body is deliberately NOT re-inserted into the cache —
1106        // large multipart uploads typically exceed the single-object cap, and we
1107        // must never round-trip the assembled body through RAM. The next
1108        // open_object_body call will load through the normal cache path.
1109        self.cache.invalidate(&crate::cache::BodyKey::new(
1110            bucket.to_string(),
1111            final_key.to_string(),
1112            version.map(|s| s.to_string()),
1113        ));
1114
1115        Ok(BodyRef::Disk {
1116            bucket: bucket.to_string(),
1117            key: final_key.to_string(),
1118            version: version.map(|s| s.to_string()),
1119            path: bin_path,
1120            size: total_size,
1121        })
1122    }
1123}
1124
1125fn libc_exdev() -> i32 {
1126    18
1127}
1128
1129#[cfg(test)]
1130mod disk_tests {
1131    use super::*;
1132    use std::sync::Arc;
1133    use tempfile::TempDir;
1134
1135    fn new_store(tmp: &TempDir) -> DiskS3Store {
1136        let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1137        DiskS3Store::new(tmp.path().to_path_buf(), cache)
1138    }
1139
1140    fn new_store_with_cache(
1141        tmp: &TempDir,
1142        cap: u64,
1143    ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1144        let cache = Arc::new(crate::cache::BodyCache::new(cap));
1145        (
1146            DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1147            cache,
1148        )
1149    }
1150
1151    fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1152        ObjectMeta {
1153            key: key.to_string(),
1154            content_type: "application/octet-stream".to_string(),
1155            etag: "etag".to_string(),
1156            size,
1157            ..Default::default()
1158        }
1159    }
1160
1161    #[test]
1162    fn put_bucket_meta_roundtrip() {
1163        let tmp = TempDir::new().unwrap();
1164        let store = new_store(&tmp);
1165        let meta = BucketMeta {
1166            name: "b1".to_string(),
1167            region: "us-east-1".to_string(),
1168            versioning: Some("Enabled".to_string()),
1169            ..Default::default()
1170        };
1171        store.put_bucket_meta("b1", &meta).unwrap();
1172        let loaded = store.load().unwrap();
1173        let snap = loaded.buckets.get("b1").unwrap();
1174        assert_eq!(snap.meta.name, "b1");
1175        assert_eq!(snap.meta.region, "us-east-1");
1176        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1177    }
1178
1179    #[test]
1180    fn put_bucket_subresource_each_variant_writes_file() {
1181        let tmp = TempDir::new().unwrap();
1182        let store = new_store(&tmp);
1183        store
1184            .put_bucket_meta(
1185                "b",
1186                &BucketMeta {
1187                    name: "b".to_string(),
1188                    ..Default::default()
1189                },
1190            )
1191            .unwrap();
1192        let variants = [
1193            BucketSubresource::Tags,
1194            BucketSubresource::Lifecycle,
1195            BucketSubresource::Cors,
1196            BucketSubresource::Policy,
1197            BucketSubresource::Notification,
1198            BucketSubresource::Logging,
1199            BucketSubresource::Website,
1200            BucketSubresource::PublicAccessBlock,
1201            BucketSubresource::ObjectLock,
1202            BucketSubresource::Replication,
1203            BucketSubresource::Ownership,
1204            BucketSubresource::Inventory,
1205            BucketSubresource::Encryption,
1206            BucketSubresource::Versioning,
1207            BucketSubresource::Acl,
1208            BucketSubresource::Accelerate,
1209        ];
1210        for v in variants {
1211            store
1212                .put_bucket_subresource("b", v, "payload=true")
1213                .unwrap();
1214            let file = store
1215                .bucket_dir("b")
1216                .join(DiskS3Store::subresource_filename(v));
1217            assert!(file.exists(), "{:?}", v);
1218            assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1219        }
1220    }
1221
1222    #[test]
1223    fn delete_bucket_subresource_removes_file() {
1224        let tmp = TempDir::new().unwrap();
1225        let store = new_store(&tmp);
1226        store
1227            .put_bucket_meta(
1228                "b",
1229                &BucketMeta {
1230                    name: "b".to_string(),
1231                    ..Default::default()
1232                },
1233            )
1234            .unwrap();
1235        store
1236            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1237            .unwrap();
1238        store
1239            .delete_bucket_subresource("b", BucketSubresource::Tags)
1240            .unwrap();
1241        let file = store.bucket_dir("b").join("tags.toml");
1242        assert!(!file.exists());
1243        // idempotent
1244        store
1245            .delete_bucket_subresource("b", BucketSubresource::Tags)
1246            .unwrap();
1247    }
1248
1249    #[test]
1250    fn put_object_bytes_roundtrip() {
1251        let tmp = TempDir::new().unwrap();
1252        let store = new_store(&tmp);
1253        store
1254            .put_bucket_meta(
1255                "b",
1256                &BucketMeta {
1257                    name: "b".to_string(),
1258                    ..Default::default()
1259                },
1260            )
1261            .unwrap();
1262        let data = Bytes::from_static(b"hello world");
1263        let meta = sample_meta("k1", data.len() as u64);
1264        let body_ref = store
1265            .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1266            .unwrap();
1267        match &body_ref {
1268            BodyRef::Disk {
1269                bucket,
1270                key,
1271                size,
1272                path,
1273                ..
1274            } => {
1275                assert_eq!(bucket, "b");
1276                assert_eq!(key, "k1");
1277                assert_eq!(*size, data.len() as u64);
1278                assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1279            }
1280            _ => panic!("expected Disk"),
1281        }
1282        let loaded = store.load().unwrap();
1283        let snap = loaded.buckets.get("b").unwrap();
1284        let obj = snap.objects.get("k1").unwrap();
1285        assert_eq!(obj.meta.size, data.len() as u64);
1286    }
1287
1288    #[test]
1289    fn put_object_file_copy_source_leaves_src_in_place() {
1290        let tmp = TempDir::new().unwrap();
1291        let store = new_store(&tmp);
1292        store
1293            .put_bucket_meta(
1294                "b",
1295                &BucketMeta {
1296                    name: "b".to_string(),
1297                    ..Default::default()
1298                },
1299            )
1300            .unwrap();
1301        let src_dir = TempDir::new().unwrap();
1302        let src = src_dir.path().join("src.bin");
1303        std::fs::write(&src, b"copied-body").unwrap();
1304        let meta = sample_meta("k", 11);
1305        let bref = store
1306            .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1307            .unwrap();
1308        match bref {
1309            BodyRef::Disk { path, size, .. } => {
1310                assert_eq!(size, 11);
1311                assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1312            }
1313            _ => panic!("expected Disk bodyref"),
1314        }
1315        assert!(src.exists(), "source file must not be moved by FileCopy");
1316        assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1317    }
1318
1319    #[test]
1320    fn put_object_file_source() {
1321        let tmp = TempDir::new().unwrap();
1322        let store = new_store(&tmp);
1323        store
1324            .put_bucket_meta(
1325                "b",
1326                &BucketMeta {
1327                    name: "b".to_string(),
1328                    ..Default::default()
1329                },
1330            )
1331            .unwrap();
1332        let src = tmp.path().join("src.bin");
1333        std::fs::write(&src, b"file-body").unwrap();
1334        let meta = sample_meta("k", 9);
1335        let body_ref = store
1336            .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1337            .unwrap();
1338        let path = match body_ref {
1339            BodyRef::Disk { path, .. } => path,
1340            _ => panic!(),
1341        };
1342        assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1343    }
1344
1345    #[test]
1346    fn put_object_meta_only_keeps_bin() {
1347        let tmp = TempDir::new().unwrap();
1348        let store = new_store(&tmp);
1349        store
1350            .put_bucket_meta(
1351                "b",
1352                &BucketMeta {
1353                    name: "b".to_string(),
1354                    ..Default::default()
1355                },
1356            )
1357            .unwrap();
1358        let data = Bytes::from_static(b"abc");
1359        let mut meta = sample_meta("k", 3);
1360        store
1361            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1362            .unwrap();
1363        let (_, bin, _) = store.object_paths("b", "k", None);
1364        let before = std::fs::read(&bin).unwrap();
1365        meta.tags.insert("x".to_string(), "y".to_string());
1366        store.put_object_meta("b", "k", None, &meta).unwrap();
1367        assert_eq!(std::fs::read(&bin).unwrap(), before);
1368        let loaded = store.load().unwrap();
1369        let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1370        assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1371    }
1372
1373    #[test]
1374    fn delete_object_cleans_up_files_and_cache() {
1375        let tmp = TempDir::new().unwrap();
1376        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1377        store
1378            .put_bucket_meta(
1379                "b",
1380                &BucketMeta {
1381                    name: "b".to_string(),
1382                    ..Default::default()
1383                },
1384            )
1385            .unwrap();
1386        let data = Bytes::from_static(b"bye");
1387        let meta = sample_meta("k", 3);
1388        store
1389            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1390            .unwrap();
1391        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1392        assert!(cache.get(&body_key).is_some());
1393        store.delete_object("b", "k", None).unwrap();
1394        let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1395        assert!(!bin.exists());
1396        assert!(!toml_path.exists());
1397        assert!(!dir.exists());
1398        assert!(cache.get(&body_key).is_none());
1399    }
1400
1401    #[test]
1402    fn open_object_body_cache_hit_and_refill() {
1403        let tmp = TempDir::new().unwrap();
1404        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1405        store
1406            .put_bucket_meta(
1407                "b",
1408                &BucketMeta {
1409                    name: "b".to_string(),
1410                    ..Default::default()
1411                },
1412            )
1413            .unwrap();
1414        let data = Bytes::from_static(b"payload");
1415        let meta = sample_meta("k", data.len() as u64);
1416        let body_ref = store
1417            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1418            .unwrap();
1419        // Cache hit.
1420        let got = store.open_object_body(&body_ref).unwrap();
1421        assert_eq!(got, data);
1422        // Invalidate and re-read populates cache from disk.
1423        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1424        cache.invalidate(&body_key);
1425        assert!(cache.get(&body_key).is_none());
1426        let got = store.open_object_body(&body_ref).unwrap();
1427        assert_eq!(got, data);
1428        assert!(cache.get(&body_key).is_some());
1429    }
1430
1431    #[test]
1432    fn open_object_body_large_bypasses_cache() {
1433        let tmp = TempDir::new().unwrap();
1434        // capacity 1024 → single-object cap 512. Use 800-byte body.
1435        let (store, cache) = new_store_with_cache(&tmp, 1024);
1436        store
1437            .put_bucket_meta(
1438                "b",
1439                &BucketMeta {
1440                    name: "b".to_string(),
1441                    ..Default::default()
1442                },
1443            )
1444            .unwrap();
1445        let data = Bytes::from(vec![7u8; 800]);
1446        let meta = sample_meta("big", 800);
1447        let body_ref = store
1448            .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1449            .unwrap();
1450        let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1451        assert!(cache.get(&body_key).is_none());
1452        let got = store.open_object_body(&body_ref).unwrap();
1453        assert_eq!(got, data);
1454        // Still none — exceeds single-object cap.
1455        assert!(cache.get(&body_key).is_none());
1456    }
1457
1458    #[test]
1459    fn load_empty_dir() {
1460        let tmp = TempDir::new().unwrap();
1461        let store = new_store(&tmp);
1462        let s = store.load().unwrap();
1463        assert!(s.buckets.is_empty());
1464    }
1465
1466    #[test]
1467    fn load_skips_mpu_without_init() {
1468        let tmp = TempDir::new().unwrap();
1469        let store = new_store(&tmp);
1470        store
1471            .put_bucket_meta(
1472                "b",
1473                &BucketMeta {
1474                    name: "b".to_string(),
1475                    ..Default::default()
1476                },
1477            )
1478            .unwrap();
1479        let data = Bytes::from_static(b"x");
1480        let meta = sample_meta("k", 1);
1481        store
1482            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1483            .unwrap();
1484        // Directory with no init.toml is skipped by load.
1485        let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1486        std::fs::create_dir_all(&mpu).unwrap();
1487
1488        let loaded = store.load().unwrap();
1489        let snap = loaded.buckets.get("b").unwrap();
1490        assert_eq!(snap.objects.len(), 1);
1491        assert!(snap.multipart_uploads.is_empty());
1492    }
1493
1494    #[test]
1495    fn load_reads_bucket_subresources() {
1496        let tmp = TempDir::new().unwrap();
1497        let store = new_store(&tmp);
1498        store
1499            .put_bucket_meta(
1500                "b",
1501                &BucketMeta {
1502                    name: "b".to_string(),
1503                    ..Default::default()
1504                },
1505            )
1506            .unwrap();
1507        store
1508            .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1509            .unwrap();
1510        store
1511            .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1512            .unwrap();
1513        store
1514            .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1515            .unwrap();
1516        store
1517            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1518            .unwrap();
1519        let loaded = store.load().unwrap();
1520        let snap = loaded.buckets.get("b").unwrap();
1521        assert_eq!(
1522            snap.subresources.get("lifecycle.toml").map(String::as_str),
1523            Some("<Lifecycle/>"),
1524        );
1525        assert_eq!(
1526            snap.subresources.get("cors.toml").map(String::as_str),
1527            Some("<Cors/>"),
1528        );
1529        assert_eq!(
1530            snap.subresources.get("policy.toml").map(String::as_str),
1531            Some("{}"),
1532        );
1533        assert_eq!(
1534            snap.subresources.get("tags.toml").map(String::as_str),
1535            Some("x=1"),
1536        );
1537    }
1538
1539    #[test]
1540    fn versioned_put_load_roundtrip() {
1541        let tmp = TempDir::new().unwrap();
1542        let store = new_store(&tmp);
1543        store
1544            .put_bucket_meta(
1545                "b",
1546                &BucketMeta {
1547                    name: "b".to_string(),
1548                    versioning: Some("Enabled".to_string()),
1549                    ..Default::default()
1550                },
1551            )
1552            .unwrap();
1553        let base = chrono::Utc::now();
1554        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1555            .iter()
1556            .enumerate()
1557        {
1558            let mut m = sample_meta("k", body.len() as u64);
1559            m.version_id = Some((*vid).to_string());
1560            m.last_modified = base + chrono::Duration::seconds(i as i64);
1561            store
1562                .put_object(
1563                    "b",
1564                    "k",
1565                    Some(*vid),
1566                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1567                    &m,
1568                )
1569                .unwrap();
1570        }
1571        let loaded = store.load().unwrap();
1572        let snap = loaded.buckets.get("b").unwrap();
1573        let versions = snap.object_versions.get("k").expect("versions present");
1574        assert_eq!(versions.len(), 3);
1575        assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1576        assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1577        assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1578        for v in versions {
1579            match &v.body {
1580                BodyRef::Disk { size, .. } => assert!(*size > 0),
1581                _ => panic!("expected Disk body"),
1582            }
1583        }
1584    }
1585
1586    #[test]
1587    fn versioned_load_promotes_latest_live_to_snap_objects() {
1588        let tmp = TempDir::new().unwrap();
1589        let store = new_store(&tmp);
1590        store
1591            .put_bucket_meta(
1592                "b",
1593                &BucketMeta {
1594                    name: "b".to_string(),
1595                    versioning: Some("Enabled".to_string()),
1596                    ..Default::default()
1597                },
1598            )
1599            .unwrap();
1600        let base = chrono::Utc::now();
1601        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1602            .iter()
1603            .enumerate()
1604        {
1605            let mut m = sample_meta("k", body.len() as u64);
1606            m.version_id = Some((*vid).to_string());
1607            m.last_modified = base + chrono::Duration::seconds(i as i64);
1608            store
1609                .put_object(
1610                    "b",
1611                    "k",
1612                    Some(*vid),
1613                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1614                    &m,
1615                )
1616                .unwrap();
1617        }
1618        let loaded = store.load().unwrap();
1619        let snap = loaded.buckets.get("b").unwrap();
1620        let current = snap.objects.get("k").expect("current object promoted");
1621        assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1622    }
1623
1624    #[test]
1625    fn versioned_load_trailing_delete_marker_hides_current() {
1626        let tmp = TempDir::new().unwrap();
1627        let store = new_store(&tmp);
1628        store
1629            .put_bucket_meta(
1630                "b",
1631                &BucketMeta {
1632                    name: "b".to_string(),
1633                    versioning: Some("Enabled".to_string()),
1634                    ..Default::default()
1635                },
1636            )
1637            .unwrap();
1638        let base = chrono::Utc::now();
1639        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1640            .iter()
1641            .enumerate()
1642        {
1643            let mut m = sample_meta("k", body.len() as u64);
1644            m.version_id = Some((*vid).to_string());
1645            m.last_modified = base + chrono::Duration::seconds(i as i64);
1646            store
1647                .put_object(
1648                    "b",
1649                    "k",
1650                    Some(*vid),
1651                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1652                    &m,
1653                )
1654                .unwrap();
1655        }
1656        // Append a delete marker on top.
1657        let mut dm = sample_meta("k", 0);
1658        dm.version_id = Some("dm1".to_string());
1659        dm.is_delete_marker = true;
1660        dm.last_modified = base + chrono::Duration::seconds(10);
1661        store
1662            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1663            .unwrap();
1664        let loaded = store.load().unwrap();
1665        let snap = loaded.buckets.get("b").unwrap();
1666        assert!(
1667            !snap.objects.contains_key("k"),
1668            "trailing delete marker must hide current object",
1669        );
1670        assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1671    }
1672
1673    #[test]
1674    fn legacy_null_object_overridden_by_newer_versions() {
1675        // A pre-versioning null put followed by versioning-enabled puts must
1676        // see snap.objects reflect the newest live version, not the stale null.
1677        let tmp = TempDir::new().unwrap();
1678        let store = new_store(&tmp);
1679        store
1680            .put_bucket_meta(
1681                "b",
1682                &BucketMeta {
1683                    name: "b".to_string(),
1684                    ..Default::default()
1685                },
1686            )
1687            .unwrap();
1688        let base = chrono::Utc::now();
1689        let mut null_meta = sample_meta("k", 3);
1690        null_meta.last_modified = base;
1691        store
1692            .put_object(
1693                "b",
1694                "k",
1695                None,
1696                BodySource::Bytes(Bytes::from_static(b"old")),
1697                &null_meta,
1698            )
1699            .unwrap();
1700        // Enable versioning and put two versions, the second a delete.
1701        store
1702            .put_bucket_meta(
1703                "b",
1704                &BucketMeta {
1705                    name: "b".to_string(),
1706                    versioning: Some("Enabled".to_string()),
1707                    ..Default::default()
1708                },
1709            )
1710            .unwrap();
1711        let mut v1 = sample_meta("k", 3);
1712        v1.version_id = Some("v1".to_string());
1713        v1.last_modified = base + chrono::Duration::seconds(1);
1714        store
1715            .put_object(
1716                "b",
1717                "k",
1718                Some("v1"),
1719                BodySource::Bytes(Bytes::from_static(b"new")),
1720                &v1,
1721            )
1722            .unwrap();
1723        let mut v2 = sample_meta("k", 5);
1724        v2.version_id = Some("v2".to_string());
1725        v2.last_modified = base + chrono::Duration::seconds(2);
1726        store
1727            .put_object(
1728                "b",
1729                "k",
1730                Some("v2"),
1731                BodySource::Bytes(Bytes::from_static(b"newer")),
1732                &v2,
1733            )
1734            .unwrap();
1735        let loaded = store.load().unwrap();
1736        let snap = loaded.buckets.get("b").unwrap();
1737        let current = snap
1738            .objects
1739            .get("k")
1740            .expect("latest live version must override stale null");
1741        assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1742        assert_eq!(current.meta.size, 5);
1743    }
1744
1745    #[test]
1746    fn legacy_null_hidden_by_trailing_delete_marker() {
1747        let tmp = TempDir::new().unwrap();
1748        let store = new_store(&tmp);
1749        store
1750            .put_bucket_meta(
1751                "b",
1752                &BucketMeta {
1753                    name: "b".to_string(),
1754                    ..Default::default()
1755                },
1756            )
1757            .unwrap();
1758        let base = chrono::Utc::now();
1759        let mut null_meta = sample_meta("k", 3);
1760        null_meta.last_modified = base;
1761        store
1762            .put_object(
1763                "b",
1764                "k",
1765                None,
1766                BodySource::Bytes(Bytes::from_static(b"old")),
1767                &null_meta,
1768            )
1769            .unwrap();
1770        store
1771            .put_bucket_meta(
1772                "b",
1773                &BucketMeta {
1774                    name: "b".to_string(),
1775                    versioning: Some("Enabled".to_string()),
1776                    ..Default::default()
1777                },
1778            )
1779            .unwrap();
1780        let mut v1 = sample_meta("k", 3);
1781        v1.version_id = Some("v1".to_string());
1782        v1.last_modified = base + chrono::Duration::seconds(1);
1783        store
1784            .put_object(
1785                "b",
1786                "k",
1787                Some("v1"),
1788                BodySource::Bytes(Bytes::from_static(b"new")),
1789                &v1,
1790            )
1791            .unwrap();
1792        let mut dm = sample_meta("k", 0);
1793        dm.version_id = Some("dm1".to_string());
1794        dm.is_delete_marker = true;
1795        dm.last_modified = base + chrono::Duration::seconds(2);
1796        store
1797            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1798            .unwrap();
1799        let loaded = store.load().unwrap();
1800        let snap = loaded.buckets.get("b").unwrap();
1801        assert!(
1802            !snap.objects.contains_key("k"),
1803            "trailing delete marker must hide even a legacy null object",
1804        );
1805    }
1806
1807    #[test]
1808    fn delete_marker_roundtrip_no_body_file() {
1809        let tmp = TempDir::new().unwrap();
1810        let store = new_store(&tmp);
1811        store
1812            .put_bucket_meta(
1813                "b",
1814                &BucketMeta {
1815                    name: "b".to_string(),
1816                    versioning: Some("Enabled".to_string()),
1817                    ..Default::default()
1818                },
1819            )
1820            .unwrap();
1821        let mut m = sample_meta("k", 0);
1822        m.version_id = Some("dm1".to_string());
1823        m.is_delete_marker = true;
1824        store
1825            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1826            .unwrap();
1827        // No .bin file on disk for delete markers.
1828        let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1829        assert!(!bin.exists(), "delete marker must not have a .bin file");
1830        assert!(toml_path.exists());
1831
1832        let loaded = store.load().unwrap();
1833        let versions = loaded
1834            .buckets
1835            .get("b")
1836            .unwrap()
1837            .object_versions
1838            .get("k")
1839            .unwrap();
1840        assert_eq!(versions.len(), 1);
1841        assert!(versions[0].meta.is_delete_marker);
1842        match &versions[0].body {
1843            BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1844            _ => panic!("delete marker body should be empty Memory"),
1845        }
1846    }
1847
1848    #[test]
1849    fn mixed_nonversioned_and_versioned_buckets() {
1850        let tmp = TempDir::new().unwrap();
1851        let store = new_store(&tmp);
1852        store
1853            .put_bucket_meta(
1854                "a",
1855                &BucketMeta {
1856                    name: "a".to_string(),
1857                    ..Default::default()
1858                },
1859            )
1860            .unwrap();
1861        store
1862            .put_bucket_meta(
1863                "b",
1864                &BucketMeta {
1865                    name: "b".to_string(),
1866                    versioning: Some("Enabled".to_string()),
1867                    ..Default::default()
1868                },
1869            )
1870            .unwrap();
1871        let ma = sample_meta("only", 3);
1872        store
1873            .put_object(
1874                "a",
1875                "only",
1876                None,
1877                BodySource::Bytes(Bytes::from_static(b"aaa")),
1878                &ma,
1879            )
1880            .unwrap();
1881        let base = chrono::Utc::now();
1882        for (i, vid) in ["v1", "v2"].iter().enumerate() {
1883            let mut m = sample_meta("twice", 2);
1884            m.version_id = Some((*vid).to_string());
1885            m.last_modified = base + chrono::Duration::seconds(i as i64);
1886            store
1887                .put_object(
1888                    "b",
1889                    "twice",
1890                    Some(*vid),
1891                    BodySource::Bytes(Bytes::from_static(b"xx")),
1892                    &m,
1893                )
1894                .unwrap();
1895        }
1896        let loaded = store.load().unwrap();
1897        assert_eq!(loaded.buckets.len(), 2);
1898        let a = loaded.buckets.get("a").unwrap();
1899        assert_eq!(a.objects.len(), 1);
1900        assert!(a.object_versions.is_empty());
1901        let b = loaded.buckets.get("b").unwrap();
1902        // Fix #5: the latest live version is promoted into objects so
1903        // unversioned GETs see it post-restart.
1904        assert_eq!(
1905            b.objects.get("twice").unwrap().meta.version_id.as_deref(),
1906            Some("v2")
1907        );
1908        assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
1909    }
1910
1911    fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
1912        MpuInit {
1913            upload_id: upload_id.to_string(),
1914            key: key.to_string(),
1915            initiated: chrono::Utc::now(),
1916            content_type: "application/octet-stream".to_string(),
1917            storage_class: "STANDARD".to_string(),
1918            ..Default::default()
1919        }
1920    }
1921
1922    #[test]
1923    fn mpu_create_then_load_empty_parts() {
1924        let tmp = TempDir::new().unwrap();
1925        let store = new_store(&tmp);
1926        store
1927            .put_bucket_meta(
1928                "b",
1929                &BucketMeta {
1930                    name: "b".to_string(),
1931                    ..Default::default()
1932                },
1933            )
1934            .unwrap();
1935        let init = sample_mpu_init("up1", "k1");
1936        store.mpu_create("b", "up1", &init).unwrap();
1937        let loaded = store.load().unwrap();
1938        let snap = loaded.buckets.get("b").unwrap();
1939        let m = snap.multipart_uploads.get("up1").expect("upload present");
1940        assert_eq!(m.init.upload_id, "up1");
1941        assert_eq!(m.init.key, "k1");
1942        assert!(m.parts.is_empty());
1943    }
1944
1945    #[test]
1946    fn mpu_put_part_then_load_three_parts() {
1947        let tmp = TempDir::new().unwrap();
1948        let store = new_store(&tmp);
1949        store
1950            .put_bucket_meta(
1951                "b",
1952                &BucketMeta {
1953                    name: "b".to_string(),
1954                    ..Default::default()
1955                },
1956            )
1957            .unwrap();
1958        store
1959            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
1960            .unwrap();
1961        let bodies = [
1962            Bytes::from_static(b"part-one"),
1963            Bytes::from_static(b"part-two-longer"),
1964            Bytes::from_static(b"p3"),
1965        ];
1966        for (i, body) in bodies.iter().enumerate() {
1967            let n = (i + 1) as u32;
1968            store
1969                .mpu_put_part(
1970                    "b",
1971                    "up",
1972                    n,
1973                    BodySource::Bytes(body.clone()),
1974                    &format!("et{}", n),
1975                )
1976                .unwrap();
1977        }
1978        let loaded = store.load().unwrap();
1979        let snap = loaded.buckets.get("b").unwrap();
1980        let m = snap.multipart_uploads.get("up").unwrap();
1981        assert_eq!(m.parts.len(), 3);
1982        for (i, body) in bodies.iter().enumerate() {
1983            let n = (i + 1) as u32;
1984            let part = m.parts.get(&n).unwrap();
1985            assert_eq!(part.meta.part_number, n);
1986            assert_eq!(part.meta.size, body.len() as u64);
1987            assert_eq!(part.meta.etag, format!("et{}", n));
1988            match &part.body {
1989                BodyRef::Disk { path, size, .. } => {
1990                    assert_eq!(*size, body.len() as u64);
1991                    assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
1992                }
1993                _ => panic!("expected Disk"),
1994            }
1995        }
1996    }
1997
1998    #[test]
1999    fn mpu_abort_removes_upload() {
2000        let tmp = TempDir::new().unwrap();
2001        let store = new_store(&tmp);
2002        store
2003            .put_bucket_meta(
2004                "b",
2005                &BucketMeta {
2006                    name: "b".to_string(),
2007                    ..Default::default()
2008                },
2009            )
2010            .unwrap();
2011        store
2012            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2013            .unwrap();
2014        store
2015            .mpu_put_part(
2016                "b",
2017                "up",
2018                1,
2019                BodySource::Bytes(Bytes::from_static(b"x")),
2020                "e",
2021            )
2022            .unwrap();
2023        store.mpu_abort("b", "up").unwrap();
2024        assert!(!store.mpu_dir("b", "up").exists());
2025        // Idempotent.
2026        store.mpu_abort("b", "up").unwrap();
2027        let loaded = store.load().unwrap();
2028        let snap = loaded.buckets.get("b").unwrap();
2029        assert!(snap.multipart_uploads.is_empty());
2030    }
2031
2032    #[test]
2033    fn mpu_complete_single_part_fast_path() {
2034        let tmp = TempDir::new().unwrap();
2035        let store = new_store(&tmp);
2036        store
2037            .put_bucket_meta(
2038                "b",
2039                &BucketMeta {
2040                    name: "b".to_string(),
2041                    ..Default::default()
2042                },
2043            )
2044            .unwrap();
2045        store
2046            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2047            .unwrap();
2048        let body = Bytes::from_static(b"only-part-bytes");
2049        store
2050            .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2051            .unwrap();
2052        let meta = sample_meta("k", body.len() as u64);
2053        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2054        match &body_ref {
2055            BodyRef::Disk { path, size, .. } => {
2056                assert_eq!(*size, body.len() as u64);
2057                assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2058            }
2059            _ => panic!("expected Disk"),
2060        }
2061        assert!(!store.mpu_dir("b", "up").exists());
2062    }
2063
2064    #[test]
2065    fn mpu_complete_multi_part_concat() {
2066        let tmp = TempDir::new().unwrap();
2067        let store = new_store(&tmp);
2068        store
2069            .put_bucket_meta(
2070                "b",
2071                &BucketMeta {
2072                    name: "b".to_string(),
2073                    ..Default::default()
2074                },
2075            )
2076            .unwrap();
2077        store
2078            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2079            .unwrap();
2080        let p1 = Bytes::from_static(b"AAAA");
2081        let p2 = Bytes::from_static(b"BBBBBB");
2082        let p3 = Bytes::from_static(b"CC");
2083        for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2084            store
2085                .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2086                .unwrap();
2087        }
2088        let mut expected = Vec::new();
2089        expected.extend_from_slice(&p1);
2090        expected.extend_from_slice(&p2);
2091        expected.extend_from_slice(&p3);
2092        let meta = sample_meta("k", expected.len() as u64);
2093        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2094        let path = match body_ref {
2095            BodyRef::Disk { path, size, .. } => {
2096                assert_eq!(size, expected.len() as u64);
2097                path
2098            }
2099            _ => panic!("expected Disk"),
2100        };
2101        assert_eq!(std::fs::read(&path).unwrap(), expected);
2102        assert!(!store.mpu_dir("b", "up").exists());
2103    }
2104
2105    #[test]
2106    fn mpu_complete_large_streaming_via_file_source() {
2107        let tmp = TempDir::new().unwrap();
2108        let store = new_store(&tmp);
2109        store
2110            .put_bucket_meta(
2111                "b",
2112                &BucketMeta {
2113                    name: "b".to_string(),
2114                    ..Default::default()
2115                },
2116            )
2117            .unwrap();
2118        store
2119            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2120            .unwrap();
2121
2122        // 3 parts of ~1 MiB each (kept small so tests stay fast but still
2123        // exercise the BodySource::File + streaming concat path).
2124        const PART_SIZE: usize = 1024 * 1024;
2125        let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2126        let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2127        for (i, byte) in patterns.iter().enumerate() {
2128            let src = tmp.path().join(format!("src-{}.bin", i + 1));
2129            let data = vec![*byte; PART_SIZE];
2130            std::fs::write(&src, &data).unwrap();
2131            expected.extend_from_slice(&data);
2132            store
2133                .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2134                .unwrap();
2135        }
2136        let meta = sample_meta("k", expected.len() as u64);
2137        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2138        let path = match body_ref {
2139            BodyRef::Disk { path, size, .. } => {
2140                assert_eq!(size, expected.len() as u64);
2141                path
2142            }
2143            _ => panic!("expected Disk"),
2144        };
2145        let actual = std::fs::read(&path).unwrap();
2146        assert_eq!(actual.len(), expected.len());
2147        assert_eq!(actual, expected);
2148        assert!(!store.mpu_dir("b", "up").exists());
2149    }
2150
2151    #[test]
2152    fn mpu_resumable_across_load() {
2153        let tmp = TempDir::new().unwrap();
2154        let store = new_store(&tmp);
2155        store
2156            .put_bucket_meta(
2157                "b",
2158                &BucketMeta {
2159                    name: "b".to_string(),
2160                    ..Default::default()
2161                },
2162            )
2163            .unwrap();
2164        store
2165            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2166            .unwrap();
2167        let p1 = Bytes::from_static(b"hello-");
2168        let p2 = Bytes::from_static(b"world-");
2169        store
2170            .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2171            .unwrap();
2172        store
2173            .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2174            .unwrap();
2175
2176        // Simulate a restart: re-open the store on the same dir and load.
2177        let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2178        let loaded = store2.load().unwrap();
2179        let snap = loaded.buckets.get("b").unwrap();
2180        let m = snap.multipart_uploads.get("up").unwrap();
2181        assert_eq!(m.parts.len(), 2);
2182        assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2183        assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2184
2185        // Continue the upload on the fresh store.
2186        let p3 = Bytes::from_static(b"again!");
2187        store2
2188            .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2189            .unwrap();
2190        let mut expected = Vec::new();
2191        expected.extend_from_slice(&p1);
2192        expected.extend_from_slice(&p2);
2193        expected.extend_from_slice(&p3);
2194        let meta = sample_meta("k", expected.len() as u64);
2195        let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2196        let path = match body_ref {
2197            BodyRef::Disk { path, .. } => path,
2198            _ => panic!(),
2199        };
2200        assert_eq!(std::fs::read(&path).unwrap(), expected);
2201        assert!(!store2.mpu_dir("b", "up").exists());
2202    }
2203
2204    #[test]
2205    fn tags_snapshot_roundtrip_via_store() {
2206        let tmp = TempDir::new().unwrap();
2207        let store = new_store(&tmp);
2208        store
2209            .put_bucket_meta(
2210                "b",
2211                &BucketMeta {
2212                    name: "b".to_string(),
2213                    ..Default::default()
2214                },
2215            )
2216            .unwrap();
2217        let mut tags = HashMap::new();
2218        tags.insert("env".to_string(), "prod".to_string());
2219        tags.insert("team".to_string(), "s3".to_string());
2220        let snap = TagsSnapshot { tags: tags.clone() };
2221        let payload = toml::to_string(&snap).unwrap();
2222        store
2223            .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2224            .unwrap();
2225        let loaded = store.load().unwrap();
2226        let text = loaded
2227            .buckets
2228            .get("b")
2229            .unwrap()
2230            .subresources
2231            .get("tags.toml")
2232            .cloned()
2233            .unwrap();
2234        let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2235        assert_eq!(decoded.tags, tags);
2236    }
2237
2238    #[test]
2239    fn acl_snapshot_roundtrip_via_store() {
2240        let tmp = TempDir::new().unwrap();
2241        let store = new_store(&tmp);
2242        store
2243            .put_bucket_meta(
2244                "b",
2245                &BucketMeta {
2246                    name: "b".to_string(),
2247                    ..Default::default()
2248                },
2249            )
2250            .unwrap();
2251        let snap = AclSnapshot {
2252            owner_id: "owner-abc".to_string(),
2253            grants: vec![
2254                AclGrantSnapshot {
2255                    grantee_type: "CanonicalUser".to_string(),
2256                    grantee_id: Some("owner-abc".to_string()),
2257                    grantee_display_name: Some("owner".to_string()),
2258                    grantee_uri: None,
2259                    permission: "FULL_CONTROL".to_string(),
2260                },
2261                AclGrantSnapshot {
2262                    grantee_type: "Group".to_string(),
2263                    grantee_id: None,
2264                    grantee_display_name: None,
2265                    grantee_uri: Some(
2266                        "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2267                    ),
2268                    permission: "READ".to_string(),
2269                },
2270            ],
2271        };
2272        let payload = toml::to_string(&snap).unwrap();
2273        store
2274            .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2275            .unwrap();
2276        let loaded = store.load().unwrap();
2277        let text = loaded
2278            .buckets
2279            .get("b")
2280            .unwrap()
2281            .subresources
2282            .get("acl.toml")
2283            .cloned()
2284            .unwrap();
2285        let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2286        assert_eq!(decoded.owner_id, "owner-abc");
2287        assert_eq!(decoded.grants.len(), 2);
2288        assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2289        assert_eq!(decoded.grants[1].grantee_type, "Group");
2290    }
2291
2292    #[test]
2293    fn inventory_snapshot_roundtrip_via_store() {
2294        let tmp = TempDir::new().unwrap();
2295        let store = new_store(&tmp);
2296        store
2297            .put_bucket_meta(
2298                "b",
2299                &BucketMeta {
2300                    name: "b".to_string(),
2301                    ..Default::default()
2302                },
2303            )
2304            .unwrap();
2305        let mut configs = HashMap::new();
2306        configs.insert(
2307            "inv-1".to_string(),
2308            "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2309        );
2310        configs.insert(
2311            "inv-2".to_string(),
2312            "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2313        );
2314        let snap = InventorySnapshot {
2315            configs: configs.clone(),
2316        };
2317        let payload = toml::to_string(&snap).unwrap();
2318        store
2319            .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2320            .unwrap();
2321        let loaded = store.load().unwrap();
2322        let text = loaded
2323            .buckets
2324            .get("b")
2325            .unwrap()
2326            .subresources
2327            .get("inventory.toml")
2328            .cloned()
2329            .unwrap();
2330        let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2331        assert_eq!(decoded.configs, configs);
2332    }
2333
2334    #[test]
2335    fn legacy_versioning_file_is_read() {
2336        let tmp = TempDir::new().unwrap();
2337        let store = new_store(&tmp);
2338        // Bucket meta with no versioning field set.
2339        store
2340            .put_bucket_meta(
2341                "b",
2342                &BucketMeta {
2343                    name: "b".to_string(),
2344                    ..Default::default()
2345                },
2346            )
2347            .unwrap();
2348        // Legacy sidecar: a bare versioning.toml with "Enabled".
2349        let path = store.bucket_dir("b").join("versioning.toml");
2350        std::fs::write(&path, "Enabled").unwrap();
2351        let loaded = store.load().unwrap();
2352        let snap = loaded.buckets.get("b").unwrap();
2353        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2354    }
2355}