Skip to main content

fakecloud_persistence/
s3.rs

1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11    #[serde(default)]
12    pub account_id: String,
13    #[serde(default)]
14    pub region: String,
15    #[serde(default)]
16    pub buckets: BTreeMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21    pub meta: BucketMeta,
22    #[serde(default)]
23    pub objects: BTreeMap<String, LoadedObject>,
24    #[serde(default)]
25    pub object_versions: BTreeMap<String, Vec<LoadedObject>>,
26    #[serde(default)]
27    pub subresources: BTreeMap<String, String>,
28    #[serde(default)]
29    pub multipart_uploads: BTreeMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34    pub init: MpuInit,
35    #[serde(default)]
36    pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41    pub meta: UploadPartMeta,
42    #[serde(default)]
43    pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48    pub meta: ObjectMeta,
49    #[serde(default)]
50    pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55    #[serde(default)]
56    pub name: String,
57    #[serde(default = "default_time")]
58    pub creation_date: DateTime<Utc>,
59    #[serde(default)]
60    pub region: String,
61    #[serde(default)]
62    pub versioning: Option<String>,
63    /// MFA-Delete status ("Enabled"/"Disabled") set via PutBucketVersioning.
64    #[serde(default)]
65    pub mfa_delete: Option<String>,
66    #[serde(default)]
67    pub acl: Option<String>,
68    #[serde(default)]
69    pub acl_owner_id: String,
70    #[serde(default)]
71    pub accelerate_status: Option<String>,
72    #[serde(default)]
73    pub eventbridge_enabled: bool,
74    #[serde(default)]
75    pub lifecycle_transition_default_min_size: Option<String>,
76}
77
78fn default_time() -> DateTime<Utc> {
79    DateTime::<Utc>::MIN_UTC
80}
81
82#[derive(Debug, Default, Clone, Serialize, Deserialize)]
83pub struct AclGrantSnapshot {
84    pub grantee_type: String,
85    #[serde(default)]
86    pub grantee_id: Option<String>,
87    #[serde(default)]
88    pub grantee_display_name: Option<String>,
89    #[serde(default)]
90    pub grantee_uri: Option<String>,
91    pub permission: String,
92}
93
94#[derive(Debug, Default, Clone, Serialize, Deserialize)]
95pub struct TagsSnapshot {
96    #[serde(default)]
97    pub tags: BTreeMap<String, String>,
98}
99
100#[derive(Debug, Default, Clone, Serialize, Deserialize)]
101pub struct AclSnapshot {
102    #[serde(default)]
103    pub owner_id: String,
104    #[serde(default)]
105    pub grants: Vec<AclGrantSnapshot>,
106}
107
108#[derive(Debug, Default, Clone, Serialize, Deserialize)]
109pub struct InventorySnapshot {
110    #[serde(default)]
111    pub configs: BTreeMap<String, String>,
112}
113
114#[derive(Debug, Default, Clone, Serialize, Deserialize)]
115pub struct ObjectMeta {
116    #[serde(default)]
117    pub key: String,
118    #[serde(default)]
119    pub content_type: String,
120    #[serde(default)]
121    pub etag: String,
122    #[serde(default)]
123    pub size: u64,
124    #[serde(default = "default_time")]
125    pub last_modified: DateTime<Utc>,
126    #[serde(default)]
127    pub metadata: BTreeMap<String, String>,
128    #[serde(default)]
129    pub tags: BTreeMap<String, String>,
130    #[serde(default)]
131    pub storage_class: String,
132    #[serde(default)]
133    pub acl_grants: Vec<AclGrantSnapshot>,
134    #[serde(default)]
135    pub acl_owner_id: Option<String>,
136    #[serde(default)]
137    pub parts_count: Option<u32>,
138    #[serde(default)]
139    pub part_sizes: Option<Vec<(u32, u64)>>,
140    #[serde(default)]
141    pub sse_algorithm: Option<String>,
142    #[serde(default)]
143    pub sse_kms_key_id: Option<String>,
144    #[serde(default)]
145    pub bucket_key_enabled: Option<bool>,
146    #[serde(default)]
147    pub version_id: Option<String>,
148    #[serde(default)]
149    pub is_delete_marker: bool,
150    #[serde(default)]
151    pub restore_ongoing: Option<bool>,
152    #[serde(default)]
153    pub restore_expiry: Option<String>,
154    #[serde(default)]
155    pub checksum_algorithm: Option<String>,
156    #[serde(default)]
157    pub checksum_value: Option<String>,
158    #[serde(default)]
159    pub lock_mode: Option<String>,
160    #[serde(default)]
161    pub lock_retain_until: Option<DateTime<Utc>>,
162    #[serde(default)]
163    pub lock_legal_hold: Option<String>,
164    #[serde(default)]
165    pub content_encoding: Option<String>,
166    #[serde(default)]
167    pub website_redirect_location: Option<String>,
168}
169
170#[derive(Debug, Default, Clone, Serialize, Deserialize)]
171pub struct MpuInit {
172    pub upload_id: String,
173    pub key: String,
174    #[serde(default = "default_time")]
175    pub initiated: DateTime<Utc>,
176    #[serde(default)]
177    pub metadata: BTreeMap<String, String>,
178    #[serde(default)]
179    pub content_type: String,
180    #[serde(default)]
181    pub storage_class: String,
182    #[serde(default)]
183    pub sse_algorithm: Option<String>,
184    #[serde(default)]
185    pub sse_kms_key_id: Option<String>,
186    #[serde(default)]
187    pub tagging: Option<String>,
188    #[serde(default)]
189    pub acl_grants: Vec<AclGrantSnapshot>,
190    #[serde(default)]
191    pub checksum_algorithm: Option<String>,
192}
193
194#[derive(Debug, Default, Clone, Serialize, Deserialize)]
195pub struct UploadPartMeta {
196    pub part_number: u32,
197    pub etag: String,
198    pub size: u64,
199    #[serde(default = "default_time")]
200    pub last_modified: DateTime<Utc>,
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub enum BucketSubresource {
205    Tags,
206    Lifecycle,
207    Cors,
208    Policy,
209    Notification,
210    Logging,
211    Website,
212    PublicAccessBlock,
213    ObjectLock,
214    Replication,
215    Ownership,
216    Inventory,
217    Encryption,
218    Versioning,
219    Acl,
220    Accelerate,
221    Analytics,
222    IntelligentTiering,
223    Metrics,
224    RequestPayment,
225    Abac,
226    MetadataConfiguration,
227    MetadataTableConfiguration,
228}
229
230pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
231    BucketSubresource::Tags,
232    BucketSubresource::Lifecycle,
233    BucketSubresource::Cors,
234    BucketSubresource::Policy,
235    BucketSubresource::Notification,
236    BucketSubresource::Logging,
237    BucketSubresource::Website,
238    BucketSubresource::PublicAccessBlock,
239    BucketSubresource::ObjectLock,
240    BucketSubresource::Replication,
241    BucketSubresource::Ownership,
242    BucketSubresource::Inventory,
243    BucketSubresource::Encryption,
244    BucketSubresource::Versioning,
245    BucketSubresource::Acl,
246    BucketSubresource::Accelerate,
247    BucketSubresource::Analytics,
248    BucketSubresource::IntelligentTiering,
249    BucketSubresource::Metrics,
250    BucketSubresource::RequestPayment,
251    BucketSubresource::Abac,
252    BucketSubresource::MetadataConfiguration,
253    BucketSubresource::MetadataTableConfiguration,
254];
255
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub enum BodyRef {
258    #[serde(skip)]
259    Memory(Bytes),
260    Disk {
261        bucket: String,
262        key: String,
263        #[serde(default)]
264        version: Option<String>,
265        path: PathBuf,
266        size: u64,
267    },
268}
269
270impl BodyRef {
271    pub fn size(&self) -> u64 {
272        match self {
273            BodyRef::Memory(b) => b.len() as u64,
274            BodyRef::Disk { size, .. } => *size,
275        }
276    }
277}
278
279impl Default for BodyRef {
280    fn default() -> Self {
281        BodyRef::Memory(Bytes::new())
282    }
283}
284
285#[derive(Debug)]
286pub enum BodySource {
287    Bytes(Bytes),
288    /// Existing disk path that should be *moved* into the destination via
289    /// rename (upload-tmp → final) and consumed.
290    File(PathBuf),
291    /// Existing disk path that should be *copied* into the destination and
292    /// left in place. Used by replication so the source object stays
293    /// available while the replica is produced.
294    FileCopy(PathBuf),
295}
296
297/// Materialize a [`BodySource`] into [`Bytes`]. The `File` variant is
298/// consumed (file is unlinked after read); `FileCopy` is left in place.
299/// Memory-mode stores call this to absorb tempfiles produced by the
300/// streaming dispatch path so streaming and non-streaming services share
301/// the same wire-shape regardless of `--storage-mode`.
302fn read_body_source(body: BodySource) -> StoreResult<Bytes> {
303    match body {
304        BodySource::Bytes(b) => Ok(b),
305        BodySource::File(p) => {
306            let bytes = std::fs::read(&p)?;
307            // Best-effort: a leftover tempfile is not a correctness
308            // problem so the read result wins over the unlink result.
309            let _ = std::fs::remove_file(&p);
310            Ok(Bytes::from(bytes))
311        }
312        BodySource::FileCopy(p) => {
313            let bytes = std::fs::read(&p)?;
314            Ok(Bytes::from(bytes))
315        }
316    }
317}
318
319#[derive(Debug, Error)]
320pub enum StoreError {
321    #[error("io error: {0}")]
322    Io(#[from] std::io::Error),
323    #[error("serialization error: {0}")]
324    Serde(String),
325    #[error("not supported by this store")]
326    NotSupported,
327    #[error("{0}")]
328    Other(String),
329}
330
331pub type StoreResult<T> = Result<T, StoreError>;
332
333pub trait S3Store: Send + Sync {
334    fn load(&self) -> StoreResult<S3State>;
335
336    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
337    fn put_bucket_subresource(
338        &self,
339        bucket: &str,
340        kind: BucketSubresource,
341        payload: &str,
342    ) -> StoreResult<()>;
343    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
344    fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
345
346    fn put_object(
347        &self,
348        bucket: &str,
349        key: &str,
350        version: Option<&str>,
351        body: BodySource,
352        meta: &ObjectMeta,
353    ) -> StoreResult<BodyRef>;
354    fn put_object_meta(
355        &self,
356        bucket: &str,
357        key: &str,
358        version: Option<&str>,
359        meta: &ObjectMeta,
360    ) -> StoreResult<()>;
361    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
362    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
363
364    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
365    /// Persist a multipart-upload part body. Returns the [`BodyRef`] the
366    /// service should keep in its in-memory part map: `BodyRef::Disk`
367    /// for disk-backed stores (so subsequent reads stream from the
368    /// `.bin` file instead of holding the part in RAM), or
369    /// `BodyRef::Memory` for memory-mode stores.
370    fn mpu_put_part(
371        &self,
372        bucket: &str,
373        upload_id: &str,
374        part_number: u32,
375        body: BodySource,
376        etag: &str,
377    ) -> StoreResult<BodyRef>;
378    /// Where the streaming dispatch path should spool large request
379    /// bodies to disk before handing them to [`Self::put_object`] /
380    /// [`Self::mpu_put_part`]. Returning `Some` keeps the spool file on
381    /// the same filesystem as the final destination so `rename(2)` is a
382    /// metadata-only move; returning `None` means "use the system temp
383    /// dir, the store will read the file back into RAM and unlink it".
384    fn spool_dir(&self) -> Option<std::path::PathBuf> {
385        None
386    }
387    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
388    fn mpu_complete(
389        &self,
390        bucket: &str,
391        upload_id: &str,
392        final_key: &str,
393        version: Option<&str>,
394        meta: &ObjectMeta,
395    ) -> StoreResult<BodyRef>;
396}
397
398pub struct MemoryS3Store;
399
400impl MemoryS3Store {
401    pub fn new() -> Self {
402        Self
403    }
404}
405
406impl Default for MemoryS3Store {
407    fn default() -> Self {
408        Self::new()
409    }
410}
411
412impl S3Store for MemoryS3Store {
413    fn load(&self) -> StoreResult<S3State> {
414        Ok(S3State::default())
415    }
416
417    fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
418        Ok(())
419    }
420    fn put_bucket_subresource(
421        &self,
422        _bucket: &str,
423        _kind: BucketSubresource,
424        _payload: &str,
425    ) -> StoreResult<()> {
426        Ok(())
427    }
428    fn delete_bucket_subresource(
429        &self,
430        _bucket: &str,
431        _kind: BucketSubresource,
432    ) -> StoreResult<()> {
433        Ok(())
434    }
435    fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
436        Ok(())
437    }
438
439    fn put_object(
440        &self,
441        _bucket: &str,
442        _key: &str,
443        _version: Option<&str>,
444        body: BodySource,
445        _meta: &ObjectMeta,
446    ) -> StoreResult<BodyRef> {
447        Ok(BodyRef::Memory(read_body_source(body)?))
448    }
449    fn put_object_meta(
450        &self,
451        _bucket: &str,
452        _key: &str,
453        _version: Option<&str>,
454        _meta: &ObjectMeta,
455    ) -> StoreResult<()> {
456        Ok(())
457    }
458    fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
459        Ok(())
460    }
461    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
462        match body {
463            BodyRef::Memory(b) => Ok(b.clone()),
464            BodyRef::Disk { .. } => {
465                panic!("MemoryS3Store cannot open Disk-backed BodyRef")
466            }
467        }
468    }
469
470    fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
471        Ok(())
472    }
473    fn mpu_put_part(
474        &self,
475        _bucket: &str,
476        _upload_id: &str,
477        _part_number: u32,
478        body: BodySource,
479        _etag: &str,
480    ) -> StoreResult<BodyRef> {
481        // Memory mode keeps all part bodies in RAM. Absorb any tempfile
482        // produced by the streaming dispatch path here.
483        Ok(BodyRef::Memory(read_body_source(body)?))
484    }
485    fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
486        Ok(())
487    }
488    fn mpu_complete(
489        &self,
490        _bucket: &str,
491        _upload_id: &str,
492        _final_key: &str,
493        _version: Option<&str>,
494        _meta: &ObjectMeta,
495    ) -> StoreResult<BodyRef> {
496        Ok(BodyRef::Memory(Bytes::new()))
497    }
498}
499
500pub struct DiskS3Store {
501    root: PathBuf,
502    cache: std::sync::Arc<crate::cache::BodyCache>,
503}
504
505impl DiskS3Store {
506    pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
507        Self { root, cache }
508    }
509
510    fn buckets_dir(&self) -> PathBuf {
511        self.root.join("buckets")
512    }
513
514    fn bucket_dir(&self, bucket: &str) -> PathBuf {
515        self.buckets_dir()
516            .join(crate::key_escape::escape_key_segment(bucket))
517    }
518
519    fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
520        self.bucket_dir(bucket)
521            .join("objects")
522            .join(crate::key_escape::escape_key_segment(key))
523    }
524
525    fn version_tag(version: Option<&str>) -> String {
526        version.unwrap_or("null").to_string()
527    }
528
529    fn object_paths(
530        &self,
531        bucket: &str,
532        key: &str,
533        version: Option<&str>,
534    ) -> (PathBuf, PathBuf, PathBuf) {
535        let dir = self.object_dir(bucket, key);
536        let tag = Self::version_tag(version);
537        let bin = dir.join(format!("{}.bin", tag));
538        let toml = dir.join(format!("{}.toml", tag));
539        (dir, bin, toml)
540    }
541
542    fn subresource_filename(kind: BucketSubresource) -> &'static str {
543        match kind {
544            BucketSubresource::Tags => "tags.toml",
545            BucketSubresource::Lifecycle => "lifecycle.toml",
546            BucketSubresource::Cors => "cors.toml",
547            BucketSubresource::Policy => "policy.toml",
548            BucketSubresource::Notification => "notification.toml",
549            BucketSubresource::Logging => "logging.toml",
550            BucketSubresource::Website => "website.toml",
551            BucketSubresource::PublicAccessBlock => "public_access_block.toml",
552            BucketSubresource::ObjectLock => "object_lock.toml",
553            BucketSubresource::Replication => "replication.toml",
554            BucketSubresource::Ownership => "ownership.toml",
555            BucketSubresource::Inventory => "inventory.toml",
556            BucketSubresource::Encryption => "encryption.toml",
557            BucketSubresource::Versioning => "versioning.toml",
558            BucketSubresource::Acl => "acl.toml",
559            BucketSubresource::Accelerate => "accelerate.toml",
560            BucketSubresource::Analytics => "analytics.toml",
561            BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
562            BucketSubresource::Metrics => "metrics.toml",
563            BucketSubresource::RequestPayment => "request_payment.toml",
564            BucketSubresource::Abac => "abac.toml",
565            BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
566            BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
567        }
568    }
569
570    fn cleanup_empty(dir: &std::path::Path) {
571        let _ = std::fs::remove_dir(dir);
572    }
573
574    fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
575        self.bucket_dir(bucket)
576            .join("mpu")
577            .join(crate::key_escape::escape_key_segment(upload_id))
578    }
579
580    fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
581        self.mpu_dir(bucket, upload_id).join("parts")
582    }
583
584    fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
585        self.mpu_parts_dir(bucket, upload_id)
586            .join(format!("{}.bin", part_number))
587    }
588
589    fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
590        self.mpu_parts_dir(bucket, upload_id)
591            .join(format!("{}.toml", part_number))
592    }
593
594    fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
595        crate::cache::BodyKey::new(
596            bucket.to_string(),
597            format!("__mpu__/{}", upload_id),
598            Some(format!("part-{}", part_number)),
599        )
600    }
601}
602
603fn io_other(msg: impl Into<String>) -> StoreError {
604    StoreError::Other(msg.into())
605}
606
607impl S3Store for DiskS3Store {
608    fn load(&self) -> StoreResult<S3State> {
609        let mut state = S3State::default();
610        let buckets_dir = self.buckets_dir();
611        if !buckets_dir.exists() {
612            return Ok(state);
613        }
614        for entry in std::fs::read_dir(&buckets_dir)? {
615            let entry = entry?;
616            if !entry.file_type()?.is_dir() {
617                continue;
618            }
619            let bdir = entry.path();
620            // Isolate per-bucket load so one corrupt/partial bucket is skipped
621            // with a warning instead of aborting the whole store load -- a single
622            // truncated file used to make every bucket inaccessible (bug-audit
623            // 2026-06-20, 4.3).
624            let loaded: StoreResult<Option<BucketSnapshot>> = (|| {
625                let meta_path = bdir.join("meta.toml");
626                if !meta_path.exists() {
627                    return Ok(None);
628                }
629                let meta_text = std::fs::read_to_string(&meta_path)?;
630                let mut meta: BucketMeta =
631                    toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
632                let mut snap = BucketSnapshot {
633                    meta: meta.clone(),
634                    objects: BTreeMap::new(),
635                    object_versions: BTreeMap::new(),
636                    subresources: BTreeMap::new(),
637                    multipart_uploads: BTreeMap::new(),
638                };
639
640                for kind in ALL_SUBRESOURCES {
641                    let fname = Self::subresource_filename(*kind);
642                    let path = bdir.join(fname);
643                    if path.exists() {
644                        let text = std::fs::read_to_string(&path)?;
645                        if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none()
646                        {
647                            let stripped = text.trim();
648                            if !stripped.is_empty() {
649                                snap.meta.versioning = Some(stripped.to_string());
650                                meta.versioning = snap.meta.versioning.clone();
651                            }
652                        }
653                        snap.subresources.insert(fname.to_string(), text);
654                    }
655                }
656
657                let objects_root = bdir.join("objects");
658                if objects_root.exists() {
659                    for okey_entry in std::fs::read_dir(&objects_root)? {
660                        let okey_entry = okey_entry?;
661                        if !okey_entry.file_type()?.is_dir() {
662                            continue;
663                        }
664                        let key_dir = okey_entry.path();
665                        let mut versioned: Vec<LoadedObject> = Vec::new();
666                        let mut key_name: Option<String> = None;
667                        for version_entry in std::fs::read_dir(&key_dir)? {
668                            let version_entry = version_entry?;
669                            let path = version_entry.path();
670                            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
671                                continue;
672                            };
673                            if !fname.ends_with(".toml") {
674                                continue;
675                            }
676                            let version_tag = &fname[..fname.len() - 5];
677                            let toml_text = std::fs::read_to_string(&path)?;
678                            let obj_meta: ObjectMeta = toml::from_str(&toml_text)
679                                .map_err(|e| StoreError::Serde(e.to_string()))?;
680                            let bin_path = key_dir.join(format!("{}.bin", version_tag));
681                            let (body, size) = if obj_meta.is_delete_marker {
682                                (BodyRef::Memory(Bytes::new()), 0u64)
683                            } else if bin_path.exists() {
684                                let sz = std::fs::metadata(&bin_path)?.len();
685                                (
686                                    BodyRef::Disk {
687                                        bucket: meta.name.clone(),
688                                        key: obj_meta.key.clone(),
689                                        version: if version_tag == "null" {
690                                            None
691                                        } else {
692                                            Some(version_tag.to_string())
693                                        },
694                                        path: bin_path,
695                                        size: sz,
696                                    },
697                                    sz,
698                                )
699                            } else {
700                                // Fail loud: the sidecar says this object has a
701                                // body but the .bin file is missing. Returning
702                                // silently would hand the caller a truncated
703                                // object and hide data loss.
704                                return Err(StoreError::Other(format!(
705                                    "missing body file: {}",
706                                    bin_path.display()
707                                )));
708                            };
709                            let _ = size;
710                            key_name.get_or_insert_with(|| obj_meta.key.clone());
711                            if version_tag == "null" && obj_meta.version_id.is_none() {
712                                snap.objects.insert(
713                                    obj_meta.key.clone(),
714                                    LoadedObject {
715                                        meta: obj_meta,
716                                        body,
717                                    },
718                                );
719                            } else {
720                                versioned.push(LoadedObject {
721                                    meta: obj_meta,
722                                    body,
723                                });
724                            }
725                        }
726                        if !versioned.is_empty() {
727                            versioned.sort_by_key(|v| v.meta.last_modified);
728                            if let Some(key) = key_name {
729                                // Reconcile snap.objects with the newest version:
730                                // a trailing delete marker hides any prior null or
731                                // live version (remove it); otherwise overwrite
732                                // with the newest live version, even if a
733                                // pre-versioning null.toml had already been
734                                // inserted during the non-versioned scan.
735                                match versioned.last() {
736                                    Some(newest) if newest.meta.is_delete_marker => {
737                                        snap.objects.remove(&key);
738                                    }
739                                    Some(newest) => {
740                                        snap.objects.insert(key.clone(), newest.clone());
741                                    }
742                                    None => {}
743                                }
744                                snap.object_versions.insert(key, versioned);
745                            }
746                        }
747                    }
748                }
749
750                let mpu_root = bdir.join("mpu");
751                if mpu_root.exists() {
752                    for upload_entry in std::fs::read_dir(&mpu_root)? {
753                        let upload_entry = upload_entry?;
754                        if !upload_entry.file_type()?.is_dir() {
755                            continue;
756                        }
757                        let upload_dir = upload_entry.path();
758                        let init_path = upload_dir.join("init.toml");
759                        if !init_path.exists() {
760                            continue;
761                        }
762                        let init_text = std::fs::read_to_string(&init_path)?;
763                        let init: MpuInit = toml::from_str(&init_text)
764                            .map_err(|e| StoreError::Serde(e.to_string()))?;
765                        let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
766                        let parts_dir = upload_dir.join("parts");
767                        if parts_dir.exists() {
768                            for part_entry in std::fs::read_dir(&parts_dir)? {
769                                let part_entry = part_entry?;
770                                let path = part_entry.path();
771                                let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
772                                    continue;
773                                };
774                                if !fname.ends_with(".toml") {
775                                    continue;
776                                }
777                                let stem = &fname[..fname.len() - 5];
778                                let Ok(part_number) = stem.parse::<u32>() else {
779                                    continue;
780                                };
781                                let toml_text = std::fs::read_to_string(&path)?;
782                                let part_meta: UploadPartMeta = toml::from_str(&toml_text)
783                                    .map_err(|e| StoreError::Serde(e.to_string()))?;
784                                let bin_path = parts_dir.join(format!("{}.bin", part_number));
785                                if !bin_path.exists() {
786                                    return Err(StoreError::Other(format!(
787                                        "missing multipart part body file: {}",
788                                        bin_path.display()
789                                    )));
790                                }
791                                let sz = std::fs::metadata(&bin_path)?.len();
792                                let body = BodyRef::Disk {
793                                    bucket: meta.name.clone(),
794                                    key: format!("__mpu__/{}", init.upload_id),
795                                    version: Some(format!("part-{}", part_number)),
796                                    path: bin_path,
797                                    size: sz,
798                                };
799                                loaded_parts.insert(
800                                    part_number,
801                                    LoadedPart {
802                                        meta: part_meta,
803                                        body,
804                                    },
805                                );
806                            }
807                        }
808                        snap.multipart_uploads.insert(
809                            init.upload_id.clone(),
810                            LoadedMpu {
811                                init,
812                                parts: loaded_parts,
813                            },
814                        );
815                    }
816                }
817
818                Ok(Some(snap))
819            })();
820            match loaded {
821                Ok(Some(snap)) => {
822                    state.buckets.insert(snap.meta.name.clone(), snap);
823                }
824                Ok(None) => {}
825                Err(e) => tracing::warn!(
826                    bucket = %bdir.display(),
827                    error = %e,
828                    "skipping unreadable S3 bucket during load"
829                ),
830            }
831        }
832        Ok(state)
833    }
834
835    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
836        let dir = self.bucket_dir(bucket);
837        std::fs::create_dir_all(&dir)?;
838        crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
839        Ok(())
840    }
841
842    fn put_bucket_subresource(
843        &self,
844        bucket: &str,
845        kind: BucketSubresource,
846        payload: &str,
847    ) -> StoreResult<()> {
848        let dir = self.bucket_dir(bucket);
849        std::fs::create_dir_all(&dir)?;
850        let path = dir.join(Self::subresource_filename(kind));
851        crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
852        Ok(())
853    }
854
855    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
856        let path = self
857            .bucket_dir(bucket)
858            .join(Self::subresource_filename(kind));
859        match std::fs::remove_file(&path) {
860            Ok(_) => Ok(()),
861            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
862            Err(e) => Err(e.into()),
863        }
864    }
865
866    fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
867        let dir = self.bucket_dir(bucket);
868        match std::fs::remove_dir_all(&dir) {
869            Ok(_) => Ok(()),
870            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
871            Err(e) => Err(e.into()),
872        }
873    }
874
875    fn put_object(
876        &self,
877        bucket: &str,
878        key: &str,
879        version: Option<&str>,
880        body: BodySource,
881        meta: &ObjectMeta,
882    ) -> StoreResult<BodyRef> {
883        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
884        std::fs::create_dir_all(&dir)?;
885
886        if meta.is_delete_marker {
887            crate::atomic::write_atomic_toml(&toml_path, meta)?;
888            return Ok(BodyRef::Memory(Bytes::new()));
889        }
890
891        let size: u64;
892        let bytes_for_cache: Option<Bytes>;
893        match body {
894            BodySource::Bytes(b) => {
895                size = b.len() as u64;
896                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
897                bytes_for_cache = Some(b);
898            }
899            BodySource::File(src) => {
900                let src_size = std::fs::metadata(&src)?.len();
901                size = src_size;
902                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
903                bytes_for_cache = None;
904            }
905            BodySource::FileCopy(src) => {
906                let src_size = std::fs::metadata(&src)?.len();
907                size = src_size;
908                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
909                bytes_for_cache = None;
910            }
911        }
912
913        crate::atomic::write_atomic_toml(&toml_path, meta)?;
914
915        let body_key = crate::cache::BodyKey::new(
916            bucket.to_string(),
917            key.to_string(),
918            version.map(|s| s.to_string()),
919        );
920        if let Some(b) = bytes_for_cache {
921            self.cache.insert(body_key, b);
922        } else {
923            self.cache.invalidate(&crate::cache::BodyKey::new(
924                bucket.to_string(),
925                key.to_string(),
926                version.map(|s| s.to_string()),
927            ));
928        }
929
930        Ok(BodyRef::Disk {
931            bucket: bucket.to_string(),
932            key: key.to_string(),
933            version: version.map(|s| s.to_string()),
934            path: bin_path,
935            size,
936        })
937    }
938
939    fn put_object_meta(
940        &self,
941        bucket: &str,
942        key: &str,
943        version: Option<&str>,
944        meta: &ObjectMeta,
945    ) -> StoreResult<()> {
946        let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
947        std::fs::create_dir_all(&dir)?;
948        crate::atomic::write_atomic_toml(&toml_path, meta)?;
949        Ok(())
950    }
951
952    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
953        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
954        for p in [&bin_path, &toml_path] {
955            match std::fs::remove_file(p) {
956                Ok(_) => {}
957                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
958                Err(e) => return Err(e.into()),
959            }
960        }
961        Self::cleanup_empty(&dir);
962
963        self.cache.invalidate(&crate::cache::BodyKey::new(
964            bucket.to_string(),
965            key.to_string(),
966            version.map(|s| s.to_string()),
967        ));
968        Ok(())
969    }
970
971    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
972        match body {
973            BodyRef::Memory(b) => Ok(b.clone()),
974            BodyRef::Disk {
975                bucket,
976                key,
977                version,
978                path,
979                size: _,
980            } => {
981                let body_key =
982                    crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
983                if let Some(bytes) = self.cache.get(&body_key) {
984                    return Ok(bytes);
985                }
986                let bytes = Bytes::from(std::fs::read(path)?);
987                self.cache.insert(body_key, bytes.clone());
988                Ok(bytes)
989            }
990        }
991    }
992
993    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
994        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
995        std::fs::create_dir_all(&parts_dir)?;
996        let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
997        crate::atomic::write_atomic_toml(&init_path, init)?;
998        Ok(())
999    }
1000
1001    fn mpu_put_part(
1002        &self,
1003        bucket: &str,
1004        upload_id: &str,
1005        part_number: u32,
1006        body: BodySource,
1007        etag: &str,
1008    ) -> StoreResult<BodyRef> {
1009        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1010        std::fs::create_dir_all(&parts_dir)?;
1011        let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
1012        let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
1013
1014        let size: u64 = match body {
1015            BodySource::Bytes(b) => {
1016                let n = b.len() as u64;
1017                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
1018                let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
1019                self.cache.insert(cache_key, b);
1020                n
1021            }
1022            BodySource::File(src) => {
1023                let n = std::fs::metadata(&src)?.len();
1024                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
1025                self.cache
1026                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1027                n
1028            }
1029            BodySource::FileCopy(src) => {
1030                let n = std::fs::metadata(&src)?.len();
1031                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
1032                self.cache
1033                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1034                n
1035            }
1036        };
1037
1038        let meta = UploadPartMeta {
1039            part_number,
1040            etag: etag.to_string(),
1041            size,
1042            last_modified: Utc::now(),
1043        };
1044        crate::atomic::write_atomic_toml(&toml_path, &meta)?;
1045        // Disk-mode parts live on disk; expose them as BodyRef::Disk so
1046        // the runtime state never holds the part body in RAM.
1047        Ok(BodyRef::Disk {
1048            bucket: bucket.to_string(),
1049            key: format!("__mpu__/{upload_id}/part-{part_number:05}"),
1050            version: None,
1051            path: bin_path,
1052            size,
1053        })
1054    }
1055
1056    fn spool_dir(&self) -> Option<std::path::PathBuf> {
1057        let dir = self.root.join(".spool");
1058        // Best-effort: create lazily; the spool helper also creates if
1059        // missing. Returning the path even when create fails is safe —
1060        // the helper handles both create and the actual file open.
1061        let _ = std::fs::create_dir_all(&dir);
1062        Some(dir)
1063    }
1064
1065    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
1066        let dir = self.mpu_dir(bucket, upload_id);
1067        // Invalidate any cached part bodies for this upload.
1068        if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
1069            for entry in entries.flatten() {
1070                let path = entry.path();
1071                if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
1072                    if let Some(stem) = fname.strip_suffix(".bin") {
1073                        if let Ok(n) = stem.parse::<u32>() {
1074                            self.cache
1075                                .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1076                        }
1077                    }
1078                }
1079            }
1080        }
1081        match std::fs::remove_dir_all(&dir) {
1082            Ok(_) => Ok(()),
1083            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1084            Err(e) => Err(e.into()),
1085        }
1086    }
1087
1088    fn mpu_complete(
1089        &self,
1090        bucket: &str,
1091        upload_id: &str,
1092        final_key: &str,
1093        version: Option<&str>,
1094        meta: &ObjectMeta,
1095    ) -> StoreResult<BodyRef> {
1096        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1097        if !parts_dir.exists() {
1098            return Err(io_other(format!(
1099                "mpu_complete: no parts dir for upload {}",
1100                upload_id
1101            )));
1102        }
1103
1104        // Enumerate parts in ascending part-number order.
1105        let mut part_numbers: Vec<u32> = Vec::new();
1106        for entry in std::fs::read_dir(&parts_dir)? {
1107            let entry = entry?;
1108            let path = entry.path();
1109            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1110                continue;
1111            };
1112            if let Some(stem) = fname.strip_suffix(".toml") {
1113                if let Ok(n) = stem.parse::<u32>() {
1114                    if self.mpu_part_bin(bucket, upload_id, n).exists() {
1115                        part_numbers.push(n);
1116                    }
1117                }
1118            }
1119        }
1120        part_numbers.sort_unstable();
1121        if part_numbers.is_empty() {
1122            return Err(io_other(format!(
1123                "mpu_complete: upload {} has no parts",
1124                upload_id
1125            )));
1126        }
1127
1128        let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1129        std::fs::create_dir_all(&dir)?;
1130
1131        let total_size: u64 = if part_numbers.len() == 1 {
1132            let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1133            let sz = std::fs::metadata(&only)?.len();
1134            match std::fs::rename(&only, &bin_path) {
1135                Ok(_) => {}
1136                Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1137                    // Cross-device rename: fall back to streaming copy then remove source.
1138                    {
1139                        let mut input = std::fs::File::open(&only)?;
1140                        let tmp = {
1141                            let mut os = bin_path.as_os_str().to_owned();
1142                            os.push(".tmp");
1143                            PathBuf::from(os)
1144                        };
1145                        let mut out = std::fs::OpenOptions::new()
1146                            .write(true)
1147                            .create(true)
1148                            .truncate(true)
1149                            .open(&tmp)?;
1150                        std::io::copy(&mut input, &mut out)?;
1151                        out.sync_all()?;
1152                        std::fs::rename(&tmp, &bin_path)?;
1153                    }
1154                    let _ = std::fs::remove_file(&only);
1155                }
1156                Err(e) => return Err(e.into()),
1157            }
1158            // fsync the parent directory so the rename that links the completed
1159            // object is durable, matching the multi-part branch below. Without
1160            // it a power-loss could leave a 200-OK'd single-part MPU not durably
1161            // linked, poisoning the bucket load (bug-audit 2026-06-20, 4.4).
1162            if let Some(parent) = bin_path.parent() {
1163                if let Ok(dir_handle) = std::fs::File::open(parent) {
1164                    let _ = dir_handle.sync_all();
1165                }
1166            }
1167            sz
1168        } else {
1169            let tmp = {
1170                let mut os = bin_path.as_os_str().to_owned();
1171                os.push(".tmp");
1172                PathBuf::from(os)
1173            };
1174            let mut total: u64 = 0;
1175            {
1176                let mut out = std::fs::OpenOptions::new()
1177                    .write(true)
1178                    .create(true)
1179                    .truncate(true)
1180                    .open(&tmp)?;
1181                for n in &part_numbers {
1182                    let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1183                    let mut input = std::fs::File::open(&part_path)?;
1184                    let copied = std::io::copy(&mut input, &mut out)?;
1185                    total += copied;
1186                }
1187                out.sync_all()?;
1188            }
1189            std::fs::rename(&tmp, &bin_path)?;
1190            if let Some(parent) = bin_path.parent() {
1191                if let Ok(dir_handle) = std::fs::File::open(parent) {
1192                    let _ = dir_handle.sync_all();
1193                }
1194            }
1195            total
1196        };
1197
1198        crate::atomic::write_atomic_toml(&toml_path, meta)?;
1199
1200        // Invalidate per-part cache entries and drop the mpu dir.
1201        for n in &part_numbers {
1202            self.cache
1203                .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1204        }
1205        let mpu_dir = self.mpu_dir(bucket, upload_id);
1206        let _ = std::fs::remove_dir_all(&mpu_dir);
1207
1208        // The concatenated body is deliberately NOT re-inserted into the cache —
1209        // large multipart uploads typically exceed the single-object cap, and we
1210        // must never round-trip the assembled body through RAM. The next
1211        // open_object_body call will load through the normal cache path.
1212        self.cache.invalidate(&crate::cache::BodyKey::new(
1213            bucket.to_string(),
1214            final_key.to_string(),
1215            version.map(|s| s.to_string()),
1216        ));
1217
1218        Ok(BodyRef::Disk {
1219            bucket: bucket.to_string(),
1220            key: final_key.to_string(),
1221            version: version.map(|s| s.to_string()),
1222            path: bin_path,
1223            size: total_size,
1224        })
1225    }
1226}
1227
1228fn libc_exdev() -> i32 {
1229    18
1230}
1231
1232#[cfg(test)]
1233mod disk_tests {
1234    use super::*;
1235    use std::sync::Arc;
1236    use tempfile::TempDir;
1237
1238    fn new_store(tmp: &TempDir) -> DiskS3Store {
1239        let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1240        DiskS3Store::new(tmp.path().to_path_buf(), cache)
1241    }
1242
1243    fn new_store_with_cache(
1244        tmp: &TempDir,
1245        cap: u64,
1246    ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1247        let cache = Arc::new(crate::cache::BodyCache::new(cap));
1248        (
1249            DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1250            cache,
1251        )
1252    }
1253
1254    fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1255        ObjectMeta {
1256            key: key.to_string(),
1257            content_type: "application/octet-stream".to_string(),
1258            etag: "etag".to_string(),
1259            size,
1260            ..Default::default()
1261        }
1262    }
1263
1264    #[test]
1265    fn put_bucket_meta_roundtrip() {
1266        let tmp = TempDir::new().unwrap();
1267        let store = new_store(&tmp);
1268        let meta = BucketMeta {
1269            name: "b1".to_string(),
1270            region: "us-east-1".to_string(),
1271            versioning: Some("Enabled".to_string()),
1272            ..Default::default()
1273        };
1274        store.put_bucket_meta("b1", &meta).unwrap();
1275        let loaded = store.load().unwrap();
1276        let snap = loaded.buckets.get("b1").unwrap();
1277        assert_eq!(snap.meta.name, "b1");
1278        assert_eq!(snap.meta.region, "us-east-1");
1279        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1280    }
1281
1282    #[test]
1283    fn put_bucket_subresource_each_variant_writes_file() {
1284        let tmp = TempDir::new().unwrap();
1285        let store = new_store(&tmp);
1286        store
1287            .put_bucket_meta(
1288                "b",
1289                &BucketMeta {
1290                    name: "b".to_string(),
1291                    ..Default::default()
1292                },
1293            )
1294            .unwrap();
1295        let variants = [
1296            BucketSubresource::Tags,
1297            BucketSubresource::Lifecycle,
1298            BucketSubresource::Cors,
1299            BucketSubresource::Policy,
1300            BucketSubresource::Notification,
1301            BucketSubresource::Logging,
1302            BucketSubresource::Website,
1303            BucketSubresource::PublicAccessBlock,
1304            BucketSubresource::ObjectLock,
1305            BucketSubresource::Replication,
1306            BucketSubresource::Ownership,
1307            BucketSubresource::Inventory,
1308            BucketSubresource::Encryption,
1309            BucketSubresource::Versioning,
1310            BucketSubresource::Acl,
1311            BucketSubresource::Accelerate,
1312        ];
1313        for v in variants {
1314            store
1315                .put_bucket_subresource("b", v, "payload=true")
1316                .unwrap();
1317            let file = store
1318                .bucket_dir("b")
1319                .join(DiskS3Store::subresource_filename(v));
1320            assert!(file.exists(), "{:?}", v);
1321            assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1322        }
1323    }
1324
1325    #[test]
1326    fn delete_bucket_subresource_removes_file() {
1327        let tmp = TempDir::new().unwrap();
1328        let store = new_store(&tmp);
1329        store
1330            .put_bucket_meta(
1331                "b",
1332                &BucketMeta {
1333                    name: "b".to_string(),
1334                    ..Default::default()
1335                },
1336            )
1337            .unwrap();
1338        store
1339            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1340            .unwrap();
1341        store
1342            .delete_bucket_subresource("b", BucketSubresource::Tags)
1343            .unwrap();
1344        let file = store.bucket_dir("b").join("tags.toml");
1345        assert!(!file.exists());
1346        // idempotent
1347        store
1348            .delete_bucket_subresource("b", BucketSubresource::Tags)
1349            .unwrap();
1350    }
1351
1352    #[test]
1353    fn put_object_bytes_roundtrip() {
1354        let tmp = TempDir::new().unwrap();
1355        let store = new_store(&tmp);
1356        store
1357            .put_bucket_meta(
1358                "b",
1359                &BucketMeta {
1360                    name: "b".to_string(),
1361                    ..Default::default()
1362                },
1363            )
1364            .unwrap();
1365        let data = Bytes::from_static(b"hello world");
1366        let meta = sample_meta("k1", data.len() as u64);
1367        let body_ref = store
1368            .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1369            .unwrap();
1370        match &body_ref {
1371            BodyRef::Disk {
1372                bucket,
1373                key,
1374                size,
1375                path,
1376                ..
1377            } => {
1378                assert_eq!(bucket, "b");
1379                assert_eq!(key, "k1");
1380                assert_eq!(*size, data.len() as u64);
1381                assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1382            }
1383            _ => panic!("expected Disk"),
1384        }
1385        let loaded = store.load().unwrap();
1386        let snap = loaded.buckets.get("b").unwrap();
1387        let obj = snap.objects.get("k1").unwrap();
1388        assert_eq!(obj.meta.size, data.len() as u64);
1389    }
1390
1391    #[test]
1392    fn put_object_file_copy_source_leaves_src_in_place() {
1393        let tmp = TempDir::new().unwrap();
1394        let store = new_store(&tmp);
1395        store
1396            .put_bucket_meta(
1397                "b",
1398                &BucketMeta {
1399                    name: "b".to_string(),
1400                    ..Default::default()
1401                },
1402            )
1403            .unwrap();
1404        let src_dir = TempDir::new().unwrap();
1405        let src = src_dir.path().join("src.bin");
1406        std::fs::write(&src, b"copied-body").unwrap();
1407        let meta = sample_meta("k", 11);
1408        let bref = store
1409            .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1410            .unwrap();
1411        match bref {
1412            BodyRef::Disk { path, size, .. } => {
1413                assert_eq!(size, 11);
1414                assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1415            }
1416            _ => panic!("expected Disk bodyref"),
1417        }
1418        assert!(src.exists(), "source file must not be moved by FileCopy");
1419        assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1420    }
1421
1422    #[test]
1423    fn put_object_file_source() {
1424        let tmp = TempDir::new().unwrap();
1425        let store = new_store(&tmp);
1426        store
1427            .put_bucket_meta(
1428                "b",
1429                &BucketMeta {
1430                    name: "b".to_string(),
1431                    ..Default::default()
1432                },
1433            )
1434            .unwrap();
1435        let src = tmp.path().join("src.bin");
1436        std::fs::write(&src, b"file-body").unwrap();
1437        let meta = sample_meta("k", 9);
1438        let body_ref = store
1439            .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1440            .unwrap();
1441        let path = match body_ref {
1442            BodyRef::Disk { path, .. } => path,
1443            _ => panic!(),
1444        };
1445        assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1446    }
1447
1448    #[test]
1449    fn put_object_meta_only_keeps_bin() {
1450        let tmp = TempDir::new().unwrap();
1451        let store = new_store(&tmp);
1452        store
1453            .put_bucket_meta(
1454                "b",
1455                &BucketMeta {
1456                    name: "b".to_string(),
1457                    ..Default::default()
1458                },
1459            )
1460            .unwrap();
1461        let data = Bytes::from_static(b"abc");
1462        let mut meta = sample_meta("k", 3);
1463        store
1464            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1465            .unwrap();
1466        let (_, bin, _) = store.object_paths("b", "k", None);
1467        let before = std::fs::read(&bin).unwrap();
1468        meta.tags.insert("x".to_string(), "y".to_string());
1469        store.put_object_meta("b", "k", None, &meta).unwrap();
1470        assert_eq!(std::fs::read(&bin).unwrap(), before);
1471        let loaded = store.load().unwrap();
1472        let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1473        assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1474    }
1475
1476    #[test]
1477    fn delete_object_cleans_up_files_and_cache() {
1478        let tmp = TempDir::new().unwrap();
1479        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1480        store
1481            .put_bucket_meta(
1482                "b",
1483                &BucketMeta {
1484                    name: "b".to_string(),
1485                    ..Default::default()
1486                },
1487            )
1488            .unwrap();
1489        let data = Bytes::from_static(b"bye");
1490        let meta = sample_meta("k", 3);
1491        store
1492            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1493            .unwrap();
1494        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1495        assert!(cache.get(&body_key).is_some());
1496        store.delete_object("b", "k", None).unwrap();
1497        let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1498        assert!(!bin.exists());
1499        assert!(!toml_path.exists());
1500        assert!(!dir.exists());
1501        assert!(cache.get(&body_key).is_none());
1502    }
1503
1504    #[test]
1505    fn open_object_body_cache_hit_and_refill() {
1506        let tmp = TempDir::new().unwrap();
1507        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1508        store
1509            .put_bucket_meta(
1510                "b",
1511                &BucketMeta {
1512                    name: "b".to_string(),
1513                    ..Default::default()
1514                },
1515            )
1516            .unwrap();
1517        let data = Bytes::from_static(b"payload");
1518        let meta = sample_meta("k", data.len() as u64);
1519        let body_ref = store
1520            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1521            .unwrap();
1522        // Cache hit.
1523        let got = store.open_object_body(&body_ref).unwrap();
1524        assert_eq!(got, data);
1525        // Invalidate and re-read populates cache from disk.
1526        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1527        cache.invalidate(&body_key);
1528        assert!(cache.get(&body_key).is_none());
1529        let got = store.open_object_body(&body_ref).unwrap();
1530        assert_eq!(got, data);
1531        assert!(cache.get(&body_key).is_some());
1532    }
1533
1534    #[test]
1535    fn open_object_body_large_bypasses_cache() {
1536        let tmp = TempDir::new().unwrap();
1537        // capacity 1024 → single-object cap 512. Use 800-byte body.
1538        let (store, cache) = new_store_with_cache(&tmp, 1024);
1539        store
1540            .put_bucket_meta(
1541                "b",
1542                &BucketMeta {
1543                    name: "b".to_string(),
1544                    ..Default::default()
1545                },
1546            )
1547            .unwrap();
1548        let data = Bytes::from(vec![7u8; 800]);
1549        let meta = sample_meta("big", 800);
1550        let body_ref = store
1551            .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1552            .unwrap();
1553        let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1554        assert!(cache.get(&body_key).is_none());
1555        let got = store.open_object_body(&body_ref).unwrap();
1556        assert_eq!(got, data);
1557        // Still none — exceeds single-object cap.
1558        assert!(cache.get(&body_key).is_none());
1559    }
1560
1561    #[test]
1562    fn load_empty_dir() {
1563        let tmp = TempDir::new().unwrap();
1564        let store = new_store(&tmp);
1565        let s = store.load().unwrap();
1566        assert!(s.buckets.is_empty());
1567    }
1568
1569    #[test]
1570    fn load_skips_mpu_without_init() {
1571        let tmp = TempDir::new().unwrap();
1572        let store = new_store(&tmp);
1573        store
1574            .put_bucket_meta(
1575                "b",
1576                &BucketMeta {
1577                    name: "b".to_string(),
1578                    ..Default::default()
1579                },
1580            )
1581            .unwrap();
1582        let data = Bytes::from_static(b"x");
1583        let meta = sample_meta("k", 1);
1584        store
1585            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1586            .unwrap();
1587        // Directory with no init.toml is skipped by load.
1588        let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1589        std::fs::create_dir_all(&mpu).unwrap();
1590
1591        let loaded = store.load().unwrap();
1592        let snap = loaded.buckets.get("b").unwrap();
1593        assert_eq!(snap.objects.len(), 1);
1594        assert!(snap.multipart_uploads.is_empty());
1595    }
1596
1597    #[test]
1598    fn load_reads_bucket_subresources() {
1599        let tmp = TempDir::new().unwrap();
1600        let store = new_store(&tmp);
1601        store
1602            .put_bucket_meta(
1603                "b",
1604                &BucketMeta {
1605                    name: "b".to_string(),
1606                    ..Default::default()
1607                },
1608            )
1609            .unwrap();
1610        store
1611            .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1612            .unwrap();
1613        store
1614            .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1615            .unwrap();
1616        store
1617            .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1618            .unwrap();
1619        store
1620            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1621            .unwrap();
1622        let loaded = store.load().unwrap();
1623        let snap = loaded.buckets.get("b").unwrap();
1624        assert_eq!(
1625            snap.subresources.get("lifecycle.toml").map(String::as_str),
1626            Some("<Lifecycle/>"),
1627        );
1628        assert_eq!(
1629            snap.subresources.get("cors.toml").map(String::as_str),
1630            Some("<Cors/>"),
1631        );
1632        assert_eq!(
1633            snap.subresources.get("policy.toml").map(String::as_str),
1634            Some("{}"),
1635        );
1636        assert_eq!(
1637            snap.subresources.get("tags.toml").map(String::as_str),
1638            Some("x=1"),
1639        );
1640    }
1641
1642    #[test]
1643    fn versioned_put_load_roundtrip() {
1644        let tmp = TempDir::new().unwrap();
1645        let store = new_store(&tmp);
1646        store
1647            .put_bucket_meta(
1648                "b",
1649                &BucketMeta {
1650                    name: "b".to_string(),
1651                    versioning: Some("Enabled".to_string()),
1652                    ..Default::default()
1653                },
1654            )
1655            .unwrap();
1656        let base = chrono::Utc::now();
1657        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1658            .iter()
1659            .enumerate()
1660        {
1661            let mut m = sample_meta("k", body.len() as u64);
1662            m.version_id = Some((*vid).to_string());
1663            m.last_modified = base + chrono::Duration::seconds(i as i64);
1664            store
1665                .put_object(
1666                    "b",
1667                    "k",
1668                    Some(*vid),
1669                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1670                    &m,
1671                )
1672                .unwrap();
1673        }
1674        let loaded = store.load().unwrap();
1675        let snap = loaded.buckets.get("b").unwrap();
1676        let versions = snap.object_versions.get("k").expect("versions present");
1677        assert_eq!(versions.len(), 3);
1678        assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1679        assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1680        assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1681        for v in versions {
1682            match &v.body {
1683                BodyRef::Disk { size, .. } => assert!(*size > 0),
1684                _ => panic!("expected Disk body"),
1685            }
1686        }
1687    }
1688
1689    #[test]
1690    fn versioned_load_promotes_latest_live_to_snap_objects() {
1691        let tmp = TempDir::new().unwrap();
1692        let store = new_store(&tmp);
1693        store
1694            .put_bucket_meta(
1695                "b",
1696                &BucketMeta {
1697                    name: "b".to_string(),
1698                    versioning: Some("Enabled".to_string()),
1699                    ..Default::default()
1700                },
1701            )
1702            .unwrap();
1703        let base = chrono::Utc::now();
1704        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1705            .iter()
1706            .enumerate()
1707        {
1708            let mut m = sample_meta("k", body.len() as u64);
1709            m.version_id = Some((*vid).to_string());
1710            m.last_modified = base + chrono::Duration::seconds(i as i64);
1711            store
1712                .put_object(
1713                    "b",
1714                    "k",
1715                    Some(*vid),
1716                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1717                    &m,
1718                )
1719                .unwrap();
1720        }
1721        let loaded = store.load().unwrap();
1722        let snap = loaded.buckets.get("b").unwrap();
1723        let current = snap.objects.get("k").expect("current object promoted");
1724        assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1725    }
1726
1727    #[test]
1728    fn versioned_load_trailing_delete_marker_hides_current() {
1729        let tmp = TempDir::new().unwrap();
1730        let store = new_store(&tmp);
1731        store
1732            .put_bucket_meta(
1733                "b",
1734                &BucketMeta {
1735                    name: "b".to_string(),
1736                    versioning: Some("Enabled".to_string()),
1737                    ..Default::default()
1738                },
1739            )
1740            .unwrap();
1741        let base = chrono::Utc::now();
1742        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1743            .iter()
1744            .enumerate()
1745        {
1746            let mut m = sample_meta("k", body.len() as u64);
1747            m.version_id = Some((*vid).to_string());
1748            m.last_modified = base + chrono::Duration::seconds(i as i64);
1749            store
1750                .put_object(
1751                    "b",
1752                    "k",
1753                    Some(*vid),
1754                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1755                    &m,
1756                )
1757                .unwrap();
1758        }
1759        // Append a delete marker on top.
1760        let mut dm = sample_meta("k", 0);
1761        dm.version_id = Some("dm1".to_string());
1762        dm.is_delete_marker = true;
1763        dm.last_modified = base + chrono::Duration::seconds(10);
1764        store
1765            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1766            .unwrap();
1767        let loaded = store.load().unwrap();
1768        let snap = loaded.buckets.get("b").unwrap();
1769        assert!(
1770            !snap.objects.contains_key("k"),
1771            "trailing delete marker must hide current object",
1772        );
1773        assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1774    }
1775
1776    #[test]
1777    fn legacy_null_object_overridden_by_newer_versions() {
1778        // A pre-versioning null put followed by versioning-enabled puts must
1779        // see snap.objects reflect the newest live version, not the stale null.
1780        let tmp = TempDir::new().unwrap();
1781        let store = new_store(&tmp);
1782        store
1783            .put_bucket_meta(
1784                "b",
1785                &BucketMeta {
1786                    name: "b".to_string(),
1787                    ..Default::default()
1788                },
1789            )
1790            .unwrap();
1791        let base = chrono::Utc::now();
1792        let mut null_meta = sample_meta("k", 3);
1793        null_meta.last_modified = base;
1794        store
1795            .put_object(
1796                "b",
1797                "k",
1798                None,
1799                BodySource::Bytes(Bytes::from_static(b"old")),
1800                &null_meta,
1801            )
1802            .unwrap();
1803        // Enable versioning and put two versions, the second a delete.
1804        store
1805            .put_bucket_meta(
1806                "b",
1807                &BucketMeta {
1808                    name: "b".to_string(),
1809                    versioning: Some("Enabled".to_string()),
1810                    ..Default::default()
1811                },
1812            )
1813            .unwrap();
1814        let mut v1 = sample_meta("k", 3);
1815        v1.version_id = Some("v1".to_string());
1816        v1.last_modified = base + chrono::Duration::seconds(1);
1817        store
1818            .put_object(
1819                "b",
1820                "k",
1821                Some("v1"),
1822                BodySource::Bytes(Bytes::from_static(b"new")),
1823                &v1,
1824            )
1825            .unwrap();
1826        let mut v2 = sample_meta("k", 5);
1827        v2.version_id = Some("v2".to_string());
1828        v2.last_modified = base + chrono::Duration::seconds(2);
1829        store
1830            .put_object(
1831                "b",
1832                "k",
1833                Some("v2"),
1834                BodySource::Bytes(Bytes::from_static(b"newer")),
1835                &v2,
1836            )
1837            .unwrap();
1838        let loaded = store.load().unwrap();
1839        let snap = loaded.buckets.get("b").unwrap();
1840        let current = snap
1841            .objects
1842            .get("k")
1843            .expect("latest live version must override stale null");
1844        assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1845        assert_eq!(current.meta.size, 5);
1846    }
1847
1848    #[test]
1849    fn legacy_null_hidden_by_trailing_delete_marker() {
1850        let tmp = TempDir::new().unwrap();
1851        let store = new_store(&tmp);
1852        store
1853            .put_bucket_meta(
1854                "b",
1855                &BucketMeta {
1856                    name: "b".to_string(),
1857                    ..Default::default()
1858                },
1859            )
1860            .unwrap();
1861        let base = chrono::Utc::now();
1862        let mut null_meta = sample_meta("k", 3);
1863        null_meta.last_modified = base;
1864        store
1865            .put_object(
1866                "b",
1867                "k",
1868                None,
1869                BodySource::Bytes(Bytes::from_static(b"old")),
1870                &null_meta,
1871            )
1872            .unwrap();
1873        store
1874            .put_bucket_meta(
1875                "b",
1876                &BucketMeta {
1877                    name: "b".to_string(),
1878                    versioning: Some("Enabled".to_string()),
1879                    ..Default::default()
1880                },
1881            )
1882            .unwrap();
1883        let mut v1 = sample_meta("k", 3);
1884        v1.version_id = Some("v1".to_string());
1885        v1.last_modified = base + chrono::Duration::seconds(1);
1886        store
1887            .put_object(
1888                "b",
1889                "k",
1890                Some("v1"),
1891                BodySource::Bytes(Bytes::from_static(b"new")),
1892                &v1,
1893            )
1894            .unwrap();
1895        let mut dm = sample_meta("k", 0);
1896        dm.version_id = Some("dm1".to_string());
1897        dm.is_delete_marker = true;
1898        dm.last_modified = base + chrono::Duration::seconds(2);
1899        store
1900            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1901            .unwrap();
1902        let loaded = store.load().unwrap();
1903        let snap = loaded.buckets.get("b").unwrap();
1904        assert!(
1905            !snap.objects.contains_key("k"),
1906            "trailing delete marker must hide even a legacy null object",
1907        );
1908    }
1909
1910    #[test]
1911    fn delete_marker_roundtrip_no_body_file() {
1912        let tmp = TempDir::new().unwrap();
1913        let store = new_store(&tmp);
1914        store
1915            .put_bucket_meta(
1916                "b",
1917                &BucketMeta {
1918                    name: "b".to_string(),
1919                    versioning: Some("Enabled".to_string()),
1920                    ..Default::default()
1921                },
1922            )
1923            .unwrap();
1924        let mut m = sample_meta("k", 0);
1925        m.version_id = Some("dm1".to_string());
1926        m.is_delete_marker = true;
1927        store
1928            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1929            .unwrap();
1930        // No .bin file on disk for delete markers.
1931        let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1932        assert!(!bin.exists(), "delete marker must not have a .bin file");
1933        assert!(toml_path.exists());
1934
1935        let loaded = store.load().unwrap();
1936        let versions = loaded
1937            .buckets
1938            .get("b")
1939            .unwrap()
1940            .object_versions
1941            .get("k")
1942            .unwrap();
1943        assert_eq!(versions.len(), 1);
1944        assert!(versions[0].meta.is_delete_marker);
1945        match &versions[0].body {
1946            BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1947            _ => panic!("delete marker body should be empty Memory"),
1948        }
1949    }
1950
1951    #[test]
1952    fn mixed_nonversioned_and_versioned_buckets() {
1953        let tmp = TempDir::new().unwrap();
1954        let store = new_store(&tmp);
1955        store
1956            .put_bucket_meta(
1957                "a",
1958                &BucketMeta {
1959                    name: "a".to_string(),
1960                    ..Default::default()
1961                },
1962            )
1963            .unwrap();
1964        store
1965            .put_bucket_meta(
1966                "b",
1967                &BucketMeta {
1968                    name: "b".to_string(),
1969                    versioning: Some("Enabled".to_string()),
1970                    ..Default::default()
1971                },
1972            )
1973            .unwrap();
1974        let ma = sample_meta("only", 3);
1975        store
1976            .put_object(
1977                "a",
1978                "only",
1979                None,
1980                BodySource::Bytes(Bytes::from_static(b"aaa")),
1981                &ma,
1982            )
1983            .unwrap();
1984        let base = chrono::Utc::now();
1985        for (i, vid) in ["v1", "v2"].iter().enumerate() {
1986            let mut m = sample_meta("twice", 2);
1987            m.version_id = Some((*vid).to_string());
1988            m.last_modified = base + chrono::Duration::seconds(i as i64);
1989            store
1990                .put_object(
1991                    "b",
1992                    "twice",
1993                    Some(*vid),
1994                    BodySource::Bytes(Bytes::from_static(b"xx")),
1995                    &m,
1996                )
1997                .unwrap();
1998        }
1999        let loaded = store.load().unwrap();
2000        assert_eq!(loaded.buckets.len(), 2);
2001        let a = loaded.buckets.get("a").unwrap();
2002        assert_eq!(a.objects.len(), 1);
2003        assert!(a.object_versions.is_empty());
2004        let b = loaded.buckets.get("b").unwrap();
2005        // Fix #5: the latest live version is promoted into objects so
2006        // unversioned GETs see it post-restart.
2007        assert_eq!(
2008            b.objects.get("twice").unwrap().meta.version_id.as_deref(),
2009            Some("v2")
2010        );
2011        assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
2012    }
2013
2014    fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
2015        MpuInit {
2016            upload_id: upload_id.to_string(),
2017            key: key.to_string(),
2018            initiated: chrono::Utc::now(),
2019            content_type: "application/octet-stream".to_string(),
2020            storage_class: "STANDARD".to_string(),
2021            ..Default::default()
2022        }
2023    }
2024
2025    #[test]
2026    fn mpu_create_then_load_empty_parts() {
2027        let tmp = TempDir::new().unwrap();
2028        let store = new_store(&tmp);
2029        store
2030            .put_bucket_meta(
2031                "b",
2032                &BucketMeta {
2033                    name: "b".to_string(),
2034                    ..Default::default()
2035                },
2036            )
2037            .unwrap();
2038        let init = sample_mpu_init("up1", "k1");
2039        store.mpu_create("b", "up1", &init).unwrap();
2040        let loaded = store.load().unwrap();
2041        let snap = loaded.buckets.get("b").unwrap();
2042        let m = snap.multipart_uploads.get("up1").expect("upload present");
2043        assert_eq!(m.init.upload_id, "up1");
2044        assert_eq!(m.init.key, "k1");
2045        assert!(m.parts.is_empty());
2046    }
2047
2048    #[test]
2049    fn mpu_put_part_then_load_three_parts() {
2050        let tmp = TempDir::new().unwrap();
2051        let store = new_store(&tmp);
2052        store
2053            .put_bucket_meta(
2054                "b",
2055                &BucketMeta {
2056                    name: "b".to_string(),
2057                    ..Default::default()
2058                },
2059            )
2060            .unwrap();
2061        store
2062            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2063            .unwrap();
2064        let bodies = [
2065            Bytes::from_static(b"part-one"),
2066            Bytes::from_static(b"part-two-longer"),
2067            Bytes::from_static(b"p3"),
2068        ];
2069        for (i, body) in bodies.iter().enumerate() {
2070            let n = (i + 1) as u32;
2071            store
2072                .mpu_put_part(
2073                    "b",
2074                    "up",
2075                    n,
2076                    BodySource::Bytes(body.clone()),
2077                    &format!("et{}", n),
2078                )
2079                .unwrap();
2080        }
2081        let loaded = store.load().unwrap();
2082        let snap = loaded.buckets.get("b").unwrap();
2083        let m = snap.multipart_uploads.get("up").unwrap();
2084        assert_eq!(m.parts.len(), 3);
2085        for (i, body) in bodies.iter().enumerate() {
2086            let n = (i + 1) as u32;
2087            let part = m.parts.get(&n).unwrap();
2088            assert_eq!(part.meta.part_number, n);
2089            assert_eq!(part.meta.size, body.len() as u64);
2090            assert_eq!(part.meta.etag, format!("et{}", n));
2091            match &part.body {
2092                BodyRef::Disk { path, size, .. } => {
2093                    assert_eq!(*size, body.len() as u64);
2094                    assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2095                }
2096                _ => panic!("expected Disk"),
2097            }
2098        }
2099    }
2100
2101    #[test]
2102    fn mpu_abort_removes_upload() {
2103        let tmp = TempDir::new().unwrap();
2104        let store = new_store(&tmp);
2105        store
2106            .put_bucket_meta(
2107                "b",
2108                &BucketMeta {
2109                    name: "b".to_string(),
2110                    ..Default::default()
2111                },
2112            )
2113            .unwrap();
2114        store
2115            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2116            .unwrap();
2117        store
2118            .mpu_put_part(
2119                "b",
2120                "up",
2121                1,
2122                BodySource::Bytes(Bytes::from_static(b"x")),
2123                "e",
2124            )
2125            .unwrap();
2126        store.mpu_abort("b", "up").unwrap();
2127        assert!(!store.mpu_dir("b", "up").exists());
2128        // Idempotent.
2129        store.mpu_abort("b", "up").unwrap();
2130        let loaded = store.load().unwrap();
2131        let snap = loaded.buckets.get("b").unwrap();
2132        assert!(snap.multipart_uploads.is_empty());
2133    }
2134
2135    #[test]
2136    fn mpu_complete_single_part_fast_path() {
2137        let tmp = TempDir::new().unwrap();
2138        let store = new_store(&tmp);
2139        store
2140            .put_bucket_meta(
2141                "b",
2142                &BucketMeta {
2143                    name: "b".to_string(),
2144                    ..Default::default()
2145                },
2146            )
2147            .unwrap();
2148        store
2149            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2150            .unwrap();
2151        let body = Bytes::from_static(b"only-part-bytes");
2152        store
2153            .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2154            .unwrap();
2155        let meta = sample_meta("k", body.len() as u64);
2156        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2157        match &body_ref {
2158            BodyRef::Disk { path, size, .. } => {
2159                assert_eq!(*size, body.len() as u64);
2160                assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2161            }
2162            _ => panic!("expected Disk"),
2163        }
2164        assert!(!store.mpu_dir("b", "up").exists());
2165    }
2166
2167    #[test]
2168    fn mpu_complete_multi_part_concat() {
2169        let tmp = TempDir::new().unwrap();
2170        let store = new_store(&tmp);
2171        store
2172            .put_bucket_meta(
2173                "b",
2174                &BucketMeta {
2175                    name: "b".to_string(),
2176                    ..Default::default()
2177                },
2178            )
2179            .unwrap();
2180        store
2181            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2182            .unwrap();
2183        let p1 = Bytes::from_static(b"AAAA");
2184        let p2 = Bytes::from_static(b"BBBBBB");
2185        let p3 = Bytes::from_static(b"CC");
2186        for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2187            store
2188                .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2189                .unwrap();
2190        }
2191        let mut expected = Vec::new();
2192        expected.extend_from_slice(&p1);
2193        expected.extend_from_slice(&p2);
2194        expected.extend_from_slice(&p3);
2195        let meta = sample_meta("k", expected.len() as u64);
2196        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2197        let path = match body_ref {
2198            BodyRef::Disk { path, size, .. } => {
2199                assert_eq!(size, expected.len() as u64);
2200                path
2201            }
2202            _ => panic!("expected Disk"),
2203        };
2204        assert_eq!(std::fs::read(&path).unwrap(), expected);
2205        assert!(!store.mpu_dir("b", "up").exists());
2206    }
2207
2208    #[test]
2209    fn mpu_complete_large_streaming_via_file_source() {
2210        let tmp = TempDir::new().unwrap();
2211        let store = new_store(&tmp);
2212        store
2213            .put_bucket_meta(
2214                "b",
2215                &BucketMeta {
2216                    name: "b".to_string(),
2217                    ..Default::default()
2218                },
2219            )
2220            .unwrap();
2221        store
2222            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2223            .unwrap();
2224
2225        // 3 parts of ~1 MiB each (kept small so tests stay fast but still
2226        // exercise the BodySource::File + streaming concat path).
2227        const PART_SIZE: usize = 1024 * 1024;
2228        let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2229        let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2230        for (i, byte) in patterns.iter().enumerate() {
2231            let src = tmp.path().join(format!("src-{}.bin", i + 1));
2232            let data = vec![*byte; PART_SIZE];
2233            std::fs::write(&src, &data).unwrap();
2234            expected.extend_from_slice(&data);
2235            store
2236                .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2237                .unwrap();
2238        }
2239        let meta = sample_meta("k", expected.len() as u64);
2240        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2241        let path = match body_ref {
2242            BodyRef::Disk { path, size, .. } => {
2243                assert_eq!(size, expected.len() as u64);
2244                path
2245            }
2246            _ => panic!("expected Disk"),
2247        };
2248        let actual = std::fs::read(&path).unwrap();
2249        assert_eq!(actual.len(), expected.len());
2250        assert_eq!(actual, expected);
2251        assert!(!store.mpu_dir("b", "up").exists());
2252    }
2253
2254    #[test]
2255    fn mpu_resumable_across_load() {
2256        let tmp = TempDir::new().unwrap();
2257        let store = new_store(&tmp);
2258        store
2259            .put_bucket_meta(
2260                "b",
2261                &BucketMeta {
2262                    name: "b".to_string(),
2263                    ..Default::default()
2264                },
2265            )
2266            .unwrap();
2267        store
2268            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2269            .unwrap();
2270        let p1 = Bytes::from_static(b"hello-");
2271        let p2 = Bytes::from_static(b"world-");
2272        store
2273            .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2274            .unwrap();
2275        store
2276            .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2277            .unwrap();
2278
2279        // Simulate a restart: re-open the store on the same dir and load.
2280        let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2281        let loaded = store2.load().unwrap();
2282        let snap = loaded.buckets.get("b").unwrap();
2283        let m = snap.multipart_uploads.get("up").unwrap();
2284        assert_eq!(m.parts.len(), 2);
2285        assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2286        assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2287
2288        // Continue the upload on the fresh store.
2289        let p3 = Bytes::from_static(b"again!");
2290        store2
2291            .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2292            .unwrap();
2293        let mut expected = Vec::new();
2294        expected.extend_from_slice(&p1);
2295        expected.extend_from_slice(&p2);
2296        expected.extend_from_slice(&p3);
2297        let meta = sample_meta("k", expected.len() as u64);
2298        let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2299        let path = match body_ref {
2300            BodyRef::Disk { path, .. } => path,
2301            _ => panic!(),
2302        };
2303        assert_eq!(std::fs::read(&path).unwrap(), expected);
2304        assert!(!store2.mpu_dir("b", "up").exists());
2305    }
2306
2307    #[test]
2308    fn tags_snapshot_roundtrip_via_store() {
2309        let tmp = TempDir::new().unwrap();
2310        let store = new_store(&tmp);
2311        store
2312            .put_bucket_meta(
2313                "b",
2314                &BucketMeta {
2315                    name: "b".to_string(),
2316                    ..Default::default()
2317                },
2318            )
2319            .unwrap();
2320        let mut tags = BTreeMap::new();
2321        tags.insert("env".to_string(), "prod".to_string());
2322        tags.insert("team".to_string(), "s3".to_string());
2323        let snap = TagsSnapshot { tags: tags.clone() };
2324        let payload = toml::to_string(&snap).unwrap();
2325        store
2326            .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2327            .unwrap();
2328        let loaded = store.load().unwrap();
2329        let text = loaded
2330            .buckets
2331            .get("b")
2332            .unwrap()
2333            .subresources
2334            .get("tags.toml")
2335            .cloned()
2336            .unwrap();
2337        let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2338        assert_eq!(decoded.tags, tags);
2339    }
2340
2341    #[test]
2342    fn acl_snapshot_roundtrip_via_store() {
2343        let tmp = TempDir::new().unwrap();
2344        let store = new_store(&tmp);
2345        store
2346            .put_bucket_meta(
2347                "b",
2348                &BucketMeta {
2349                    name: "b".to_string(),
2350                    ..Default::default()
2351                },
2352            )
2353            .unwrap();
2354        let snap = AclSnapshot {
2355            owner_id: "owner-abc".to_string(),
2356            grants: vec![
2357                AclGrantSnapshot {
2358                    grantee_type: "CanonicalUser".to_string(),
2359                    grantee_id: Some("owner-abc".to_string()),
2360                    grantee_display_name: Some("owner".to_string()),
2361                    grantee_uri: None,
2362                    permission: "FULL_CONTROL".to_string(),
2363                },
2364                AclGrantSnapshot {
2365                    grantee_type: "Group".to_string(),
2366                    grantee_id: None,
2367                    grantee_display_name: None,
2368                    grantee_uri: Some(
2369                        "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2370                    ),
2371                    permission: "READ".to_string(),
2372                },
2373            ],
2374        };
2375        let payload = toml::to_string(&snap).unwrap();
2376        store
2377            .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2378            .unwrap();
2379        let loaded = store.load().unwrap();
2380        let text = loaded
2381            .buckets
2382            .get("b")
2383            .unwrap()
2384            .subresources
2385            .get("acl.toml")
2386            .cloned()
2387            .unwrap();
2388        let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2389        assert_eq!(decoded.owner_id, "owner-abc");
2390        assert_eq!(decoded.grants.len(), 2);
2391        assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2392        assert_eq!(decoded.grants[1].grantee_type, "Group");
2393    }
2394
2395    #[test]
2396    fn inventory_snapshot_roundtrip_via_store() {
2397        let tmp = TempDir::new().unwrap();
2398        let store = new_store(&tmp);
2399        store
2400            .put_bucket_meta(
2401                "b",
2402                &BucketMeta {
2403                    name: "b".to_string(),
2404                    ..Default::default()
2405                },
2406            )
2407            .unwrap();
2408        let mut configs = BTreeMap::new();
2409        configs.insert(
2410            "inv-1".to_string(),
2411            "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2412        );
2413        configs.insert(
2414            "inv-2".to_string(),
2415            "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2416        );
2417        let snap = InventorySnapshot {
2418            configs: configs.clone(),
2419        };
2420        let payload = toml::to_string(&snap).unwrap();
2421        store
2422            .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2423            .unwrap();
2424        let loaded = store.load().unwrap();
2425        let text = loaded
2426            .buckets
2427            .get("b")
2428            .unwrap()
2429            .subresources
2430            .get("inventory.toml")
2431            .cloned()
2432            .unwrap();
2433        let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2434        assert_eq!(decoded.configs, configs);
2435    }
2436
2437    #[test]
2438    fn legacy_versioning_file_is_read() {
2439        let tmp = TempDir::new().unwrap();
2440        let store = new_store(&tmp);
2441        // Bucket meta with no versioning field set.
2442        store
2443            .put_bucket_meta(
2444                "b",
2445                &BucketMeta {
2446                    name: "b".to_string(),
2447                    ..Default::default()
2448                },
2449            )
2450            .unwrap();
2451        // Legacy sidecar: a bare versioning.toml with "Enabled".
2452        let path = store.bucket_dir("b").join("versioning.toml");
2453        std::fs::write(&path, "Enabled").unwrap();
2454        let loaded = store.load().unwrap();
2455        let snap = loaded.buckets.get("b").unwrap();
2456        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2457    }
2458
2459    #[test]
2460    fn load_skips_corrupt_bucket_and_keeps_others() {
2461        // One corrupt bucket must be skipped with a warning, not abort the
2462        // whole store load (bug-audit 2026-06-20, 4.3).
2463        let tmp = TempDir::new().unwrap();
2464        let store = new_store(&tmp);
2465        store
2466            .put_bucket_meta(
2467                "good",
2468                &BucketMeta {
2469                    name: "good".to_string(),
2470                    ..Default::default()
2471                },
2472            )
2473            .unwrap();
2474        store
2475            .put_object(
2476                "good",
2477                "k",
2478                None,
2479                BodySource::Bytes(Bytes::from_static(b"hi")),
2480                &sample_meta("k", 2),
2481            )
2482            .unwrap();
2483
2484        let bad_dir = store.buckets_dir().join("bad");
2485        std::fs::create_dir_all(&bad_dir).unwrap();
2486        std::fs::write(bad_dir.join("meta.toml"), b"not valid toml = = =").unwrap();
2487
2488        let loaded = store.load().unwrap();
2489        assert!(
2490            loaded.buckets.contains_key("good"),
2491            "valid bucket must load"
2492        );
2493        assert!(
2494            !loaded.buckets.contains_key("bad"),
2495            "corrupt bucket must be skipped, not abort the load"
2496        );
2497    }
2498}