Skip to main content

fakecloud_persistence/
s3.rs

1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11    #[serde(default)]
12    pub account_id: String,
13    #[serde(default)]
14    pub region: String,
15    #[serde(default)]
16    pub buckets: BTreeMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21    pub meta: BucketMeta,
22    #[serde(default)]
23    pub objects: BTreeMap<String, LoadedObject>,
24    #[serde(default)]
25    pub object_versions: BTreeMap<String, Vec<LoadedObject>>,
26    #[serde(default)]
27    pub subresources: BTreeMap<String, String>,
28    #[serde(default)]
29    pub multipart_uploads: BTreeMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34    pub init: MpuInit,
35    #[serde(default)]
36    pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41    pub meta: UploadPartMeta,
42    #[serde(default)]
43    pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48    pub meta: ObjectMeta,
49    #[serde(default)]
50    pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55    #[serde(default)]
56    pub name: String,
57    #[serde(default = "default_time")]
58    pub creation_date: DateTime<Utc>,
59    #[serde(default)]
60    pub region: String,
61    #[serde(default)]
62    pub versioning: Option<String>,
63    /// MFA-Delete status ("Enabled"/"Disabled") set via PutBucketVersioning.
64    #[serde(default)]
65    pub mfa_delete: Option<String>,
66    #[serde(default)]
67    pub acl: Option<String>,
68    #[serde(default)]
69    pub acl_owner_id: String,
70    #[serde(default)]
71    pub accelerate_status: Option<String>,
72    #[serde(default)]
73    pub eventbridge_enabled: bool,
74    #[serde(default)]
75    pub lifecycle_transition_default_min_size: Option<String>,
76}
77
78fn default_time() -> DateTime<Utc> {
79    DateTime::<Utc>::MIN_UTC
80}
81
82#[derive(Debug, Default, Clone, Serialize, Deserialize)]
83pub struct AclGrantSnapshot {
84    pub grantee_type: String,
85    #[serde(default)]
86    pub grantee_id: Option<String>,
87    #[serde(default)]
88    pub grantee_display_name: Option<String>,
89    #[serde(default)]
90    pub grantee_uri: Option<String>,
91    pub permission: String,
92}
93
94#[derive(Debug, Default, Clone, Serialize, Deserialize)]
95pub struct TagsSnapshot {
96    #[serde(default)]
97    pub tags: BTreeMap<String, String>,
98}
99
100#[derive(Debug, Default, Clone, Serialize, Deserialize)]
101pub struct AclSnapshot {
102    #[serde(default)]
103    pub owner_id: String,
104    #[serde(default)]
105    pub grants: Vec<AclGrantSnapshot>,
106}
107
108#[derive(Debug, Default, Clone, Serialize, Deserialize)]
109pub struct InventorySnapshot {
110    #[serde(default)]
111    pub configs: BTreeMap<String, String>,
112}
113
114#[derive(Debug, Default, Clone, Serialize, Deserialize)]
115pub struct ObjectMeta {
116    #[serde(default)]
117    pub key: String,
118    #[serde(default)]
119    pub content_type: String,
120    #[serde(default)]
121    pub etag: String,
122    #[serde(default)]
123    pub size: u64,
124    #[serde(default = "default_time")]
125    pub last_modified: DateTime<Utc>,
126    #[serde(default)]
127    pub metadata: BTreeMap<String, String>,
128    #[serde(default)]
129    pub tags: BTreeMap<String, String>,
130    #[serde(default)]
131    pub storage_class: String,
132    #[serde(default)]
133    pub acl_grants: Vec<AclGrantSnapshot>,
134    #[serde(default)]
135    pub acl_owner_id: Option<String>,
136    #[serde(default)]
137    pub parts_count: Option<u32>,
138    #[serde(default)]
139    pub part_sizes: Option<Vec<(u32, u64)>>,
140    #[serde(default)]
141    pub sse_algorithm: Option<String>,
142    #[serde(default)]
143    pub sse_kms_key_id: Option<String>,
144    #[serde(default)]
145    pub bucket_key_enabled: Option<bool>,
146    #[serde(default)]
147    pub version_id: Option<String>,
148    #[serde(default)]
149    pub is_delete_marker: bool,
150    #[serde(default)]
151    pub restore_ongoing: Option<bool>,
152    #[serde(default)]
153    pub restore_expiry: Option<String>,
154    #[serde(default)]
155    pub checksum_algorithm: Option<String>,
156    #[serde(default)]
157    pub checksum_value: Option<String>,
158    #[serde(default)]
159    pub lock_mode: Option<String>,
160    #[serde(default)]
161    pub lock_retain_until: Option<DateTime<Utc>>,
162    #[serde(default)]
163    pub lock_legal_hold: Option<String>,
164    #[serde(default)]
165    pub content_encoding: Option<String>,
166    #[serde(default)]
167    pub cache_control: Option<String>,
168    #[serde(default)]
169    pub content_disposition: Option<String>,
170    #[serde(default)]
171    pub content_language: Option<String>,
172    #[serde(default)]
173    pub expires: Option<String>,
174    #[serde(default)]
175    pub website_redirect_location: Option<String>,
176}
177
178#[derive(Debug, Default, Clone, Serialize, Deserialize)]
179pub struct MpuInit {
180    pub upload_id: String,
181    pub key: String,
182    #[serde(default = "default_time")]
183    pub initiated: DateTime<Utc>,
184    #[serde(default)]
185    pub metadata: BTreeMap<String, String>,
186    #[serde(default)]
187    pub content_type: String,
188    #[serde(default)]
189    pub storage_class: String,
190    #[serde(default)]
191    pub sse_algorithm: Option<String>,
192    #[serde(default)]
193    pub sse_kms_key_id: Option<String>,
194    #[serde(default)]
195    pub tagging: Option<String>,
196    #[serde(default)]
197    pub acl_grants: Vec<AclGrantSnapshot>,
198    #[serde(default)]
199    pub checksum_algorithm: Option<String>,
200    #[serde(default)]
201    pub cache_control: Option<String>,
202    #[serde(default)]
203    pub content_disposition: Option<String>,
204    #[serde(default)]
205    pub content_language: Option<String>,
206    #[serde(default)]
207    pub expires: Option<String>,
208}
209
210#[derive(Debug, Default, Clone, Serialize, Deserialize)]
211pub struct UploadPartMeta {
212    pub part_number: u32,
213    pub etag: String,
214    pub size: u64,
215    #[serde(default = "default_time")]
216    pub last_modified: DateTime<Utc>,
217}
218
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum BucketSubresource {
221    Tags,
222    Lifecycle,
223    Cors,
224    Policy,
225    Notification,
226    Logging,
227    Website,
228    PublicAccessBlock,
229    ObjectLock,
230    Replication,
231    Ownership,
232    Inventory,
233    Encryption,
234    Versioning,
235    Acl,
236    Accelerate,
237    Analytics,
238    IntelligentTiering,
239    Metrics,
240    RequestPayment,
241    Abac,
242    MetadataConfiguration,
243    MetadataTableConfiguration,
244}
245
246pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
247    BucketSubresource::Tags,
248    BucketSubresource::Lifecycle,
249    BucketSubresource::Cors,
250    BucketSubresource::Policy,
251    BucketSubresource::Notification,
252    BucketSubresource::Logging,
253    BucketSubresource::Website,
254    BucketSubresource::PublicAccessBlock,
255    BucketSubresource::ObjectLock,
256    BucketSubresource::Replication,
257    BucketSubresource::Ownership,
258    BucketSubresource::Inventory,
259    BucketSubresource::Encryption,
260    BucketSubresource::Versioning,
261    BucketSubresource::Acl,
262    BucketSubresource::Accelerate,
263    BucketSubresource::Analytics,
264    BucketSubresource::IntelligentTiering,
265    BucketSubresource::Metrics,
266    BucketSubresource::RequestPayment,
267    BucketSubresource::Abac,
268    BucketSubresource::MetadataConfiguration,
269    BucketSubresource::MetadataTableConfiguration,
270];
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub enum BodyRef {
274    #[serde(skip)]
275    Memory(Bytes),
276    Disk {
277        bucket: String,
278        key: String,
279        #[serde(default)]
280        version: Option<String>,
281        path: PathBuf,
282        size: u64,
283    },
284}
285
286impl BodyRef {
287    pub fn size(&self) -> u64 {
288        match self {
289            BodyRef::Memory(b) => b.len() as u64,
290            BodyRef::Disk { size, .. } => *size,
291        }
292    }
293}
294
295impl Default for BodyRef {
296    fn default() -> Self {
297        BodyRef::Memory(Bytes::new())
298    }
299}
300
301#[derive(Debug)]
302pub enum BodySource {
303    Bytes(Bytes),
304    /// Existing disk path that should be *moved* into the destination via
305    /// rename (upload-tmp → final) and consumed.
306    File(PathBuf),
307    /// Existing disk path that should be *copied* into the destination and
308    /// left in place. Used by replication so the source object stays
309    /// available while the replica is produced.
310    FileCopy(PathBuf),
311}
312
313/// Materialize a [`BodySource`] into [`Bytes`]. The `File` variant is
314/// consumed (file is unlinked after read); `FileCopy` is left in place.
315/// Memory-mode stores call this to absorb tempfiles produced by the
316/// streaming dispatch path so streaming and non-streaming services share
317/// the same wire-shape regardless of `--storage-mode`.
318fn read_body_source(body: BodySource) -> StoreResult<Bytes> {
319    match body {
320        BodySource::Bytes(b) => Ok(b),
321        BodySource::File(p) => {
322            let bytes = std::fs::read(&p)?;
323            // Best-effort: a leftover tempfile is not a correctness
324            // problem so the read result wins over the unlink result.
325            let _ = std::fs::remove_file(&p);
326            Ok(Bytes::from(bytes))
327        }
328        BodySource::FileCopy(p) => {
329            let bytes = std::fs::read(&p)?;
330            Ok(Bytes::from(bytes))
331        }
332    }
333}
334
335#[derive(Debug, Error)]
336pub enum StoreError {
337    #[error("io error: {0}")]
338    Io(#[from] std::io::Error),
339    #[error("serialization error: {0}")]
340    Serde(String),
341    #[error("not supported by this store")]
342    NotSupported,
343    #[error("{0}")]
344    Other(String),
345}
346
347pub type StoreResult<T> = Result<T, StoreError>;
348
349pub trait S3Store: Send + Sync {
350    fn load(&self) -> StoreResult<S3State>;
351
352    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
353    fn put_bucket_subresource(
354        &self,
355        bucket: &str,
356        kind: BucketSubresource,
357        payload: &str,
358    ) -> StoreResult<()>;
359    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
360    fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
361
362    fn put_object(
363        &self,
364        bucket: &str,
365        key: &str,
366        version: Option<&str>,
367        body: BodySource,
368        meta: &ObjectMeta,
369    ) -> StoreResult<BodyRef>;
370    fn put_object_meta(
371        &self,
372        bucket: &str,
373        key: &str,
374        version: Option<&str>,
375        meta: &ObjectMeta,
376    ) -> StoreResult<()>;
377    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
378    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
379
380    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
381    /// Persist a multipart-upload part body. Returns the [`BodyRef`] the
382    /// service should keep in its in-memory part map: `BodyRef::Disk`
383    /// for disk-backed stores (so subsequent reads stream from the
384    /// `.bin` file instead of holding the part in RAM), or
385    /// `BodyRef::Memory` for memory-mode stores.
386    fn mpu_put_part(
387        &self,
388        bucket: &str,
389        upload_id: &str,
390        part_number: u32,
391        body: BodySource,
392        etag: &str,
393    ) -> StoreResult<BodyRef>;
394    /// Where the streaming dispatch path should spool large request
395    /// bodies to disk before handing them to [`Self::put_object`] /
396    /// [`Self::mpu_put_part`]. Returning `Some` keeps the spool file on
397    /// the same filesystem as the final destination so `rename(2)` is a
398    /// metadata-only move; returning `None` means "use the system temp
399    /// dir, the store will read the file back into RAM and unlink it".
400    fn spool_dir(&self) -> Option<std::path::PathBuf> {
401        None
402    }
403    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
404    fn mpu_complete(
405        &self,
406        bucket: &str,
407        upload_id: &str,
408        final_key: &str,
409        version: Option<&str>,
410        meta: &ObjectMeta,
411    ) -> StoreResult<BodyRef>;
412}
413
414pub struct MemoryS3Store;
415
416impl MemoryS3Store {
417    pub fn new() -> Self {
418        Self
419    }
420}
421
422impl Default for MemoryS3Store {
423    fn default() -> Self {
424        Self::new()
425    }
426}
427
428impl S3Store for MemoryS3Store {
429    fn load(&self) -> StoreResult<S3State> {
430        Ok(S3State::default())
431    }
432
433    fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
434        Ok(())
435    }
436    fn put_bucket_subresource(
437        &self,
438        _bucket: &str,
439        _kind: BucketSubresource,
440        _payload: &str,
441    ) -> StoreResult<()> {
442        Ok(())
443    }
444    fn delete_bucket_subresource(
445        &self,
446        _bucket: &str,
447        _kind: BucketSubresource,
448    ) -> StoreResult<()> {
449        Ok(())
450    }
451    fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
452        Ok(())
453    }
454
455    fn put_object(
456        &self,
457        _bucket: &str,
458        _key: &str,
459        _version: Option<&str>,
460        body: BodySource,
461        _meta: &ObjectMeta,
462    ) -> StoreResult<BodyRef> {
463        Ok(BodyRef::Memory(read_body_source(body)?))
464    }
465    fn put_object_meta(
466        &self,
467        _bucket: &str,
468        _key: &str,
469        _version: Option<&str>,
470        _meta: &ObjectMeta,
471    ) -> StoreResult<()> {
472        Ok(())
473    }
474    fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
475        Ok(())
476    }
477    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
478        match body {
479            BodyRef::Memory(b) => Ok(b.clone()),
480            BodyRef::Disk { .. } => {
481                panic!("MemoryS3Store cannot open Disk-backed BodyRef")
482            }
483        }
484    }
485
486    fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
487        Ok(())
488    }
489    fn mpu_put_part(
490        &self,
491        _bucket: &str,
492        _upload_id: &str,
493        _part_number: u32,
494        body: BodySource,
495        _etag: &str,
496    ) -> StoreResult<BodyRef> {
497        // Memory mode keeps all part bodies in RAM. Absorb any tempfile
498        // produced by the streaming dispatch path here.
499        Ok(BodyRef::Memory(read_body_source(body)?))
500    }
501    fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
502        Ok(())
503    }
504    fn mpu_complete(
505        &self,
506        _bucket: &str,
507        _upload_id: &str,
508        _final_key: &str,
509        _version: Option<&str>,
510        _meta: &ObjectMeta,
511    ) -> StoreResult<BodyRef> {
512        Ok(BodyRef::Memory(Bytes::new()))
513    }
514}
515
516pub struct DiskS3Store {
517    root: PathBuf,
518    cache: std::sync::Arc<crate::cache::BodyCache>,
519}
520
521impl DiskS3Store {
522    pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
523        Self { root, cache }
524    }
525
526    fn buckets_dir(&self) -> PathBuf {
527        self.root.join("buckets")
528    }
529
530    fn bucket_dir(&self, bucket: &str) -> PathBuf {
531        self.buckets_dir()
532            .join(crate::key_escape::escape_key_segment(bucket))
533    }
534
535    fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
536        self.bucket_dir(bucket)
537            .join("objects")
538            .join(crate::key_escape::escape_key_segment(key))
539    }
540
541    fn version_tag(version: Option<&str>) -> String {
542        version.unwrap_or("null").to_string()
543    }
544
545    fn object_paths(
546        &self,
547        bucket: &str,
548        key: &str,
549        version: Option<&str>,
550    ) -> (PathBuf, PathBuf, PathBuf) {
551        let dir = self.object_dir(bucket, key);
552        let tag = Self::version_tag(version);
553        let bin = dir.join(format!("{}.bin", tag));
554        let toml = dir.join(format!("{}.toml", tag));
555        (dir, bin, toml)
556    }
557
558    fn subresource_filename(kind: BucketSubresource) -> &'static str {
559        match kind {
560            BucketSubresource::Tags => "tags.toml",
561            BucketSubresource::Lifecycle => "lifecycle.toml",
562            BucketSubresource::Cors => "cors.toml",
563            BucketSubresource::Policy => "policy.toml",
564            BucketSubresource::Notification => "notification.toml",
565            BucketSubresource::Logging => "logging.toml",
566            BucketSubresource::Website => "website.toml",
567            BucketSubresource::PublicAccessBlock => "public_access_block.toml",
568            BucketSubresource::ObjectLock => "object_lock.toml",
569            BucketSubresource::Replication => "replication.toml",
570            BucketSubresource::Ownership => "ownership.toml",
571            BucketSubresource::Inventory => "inventory.toml",
572            BucketSubresource::Encryption => "encryption.toml",
573            BucketSubresource::Versioning => "versioning.toml",
574            BucketSubresource::Acl => "acl.toml",
575            BucketSubresource::Accelerate => "accelerate.toml",
576            BucketSubresource::Analytics => "analytics.toml",
577            BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
578            BucketSubresource::Metrics => "metrics.toml",
579            BucketSubresource::RequestPayment => "request_payment.toml",
580            BucketSubresource::Abac => "abac.toml",
581            BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
582            BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
583        }
584    }
585
586    fn cleanup_empty(dir: &std::path::Path) {
587        let _ = std::fs::remove_dir(dir);
588    }
589
590    fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
591        self.bucket_dir(bucket)
592            .join("mpu")
593            .join(crate::key_escape::escape_key_segment(upload_id))
594    }
595
596    fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
597        self.mpu_dir(bucket, upload_id).join("parts")
598    }
599
600    fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
601        self.mpu_parts_dir(bucket, upload_id)
602            .join(format!("{}.bin", part_number))
603    }
604
605    fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
606        self.mpu_parts_dir(bucket, upload_id)
607            .join(format!("{}.toml", part_number))
608    }
609
610    fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
611        crate::cache::BodyKey::new(
612            bucket.to_string(),
613            format!("__mpu__/{}", upload_id),
614            Some(format!("part-{}", part_number)),
615        )
616    }
617}
618
619fn io_other(msg: impl Into<String>) -> StoreError {
620    StoreError::Other(msg.into())
621}
622
623impl S3Store for DiskS3Store {
624    fn load(&self) -> StoreResult<S3State> {
625        let mut state = S3State::default();
626        let buckets_dir = self.buckets_dir();
627        if !buckets_dir.exists() {
628            return Ok(state);
629        }
630        for entry in std::fs::read_dir(&buckets_dir)? {
631            let entry = entry?;
632            if !entry.file_type()?.is_dir() {
633                continue;
634            }
635            let bdir = entry.path();
636            // Isolate per-bucket load so one corrupt/partial bucket is skipped
637            // with a warning instead of aborting the whole store load -- a single
638            // truncated file used to make every bucket inaccessible (bug-audit
639            // 2026-06-20, 4.3).
640            let loaded: StoreResult<Option<BucketSnapshot>> = (|| {
641                let meta_path = bdir.join("meta.toml");
642                if !meta_path.exists() {
643                    return Ok(None);
644                }
645                let meta_text = std::fs::read_to_string(&meta_path)?;
646                let mut meta: BucketMeta =
647                    toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
648                let mut snap = BucketSnapshot {
649                    meta: meta.clone(),
650                    objects: BTreeMap::new(),
651                    object_versions: BTreeMap::new(),
652                    subresources: BTreeMap::new(),
653                    multipart_uploads: BTreeMap::new(),
654                };
655
656                for kind in ALL_SUBRESOURCES {
657                    let fname = Self::subresource_filename(*kind);
658                    let path = bdir.join(fname);
659                    if path.exists() {
660                        let text = std::fs::read_to_string(&path)?;
661                        if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none()
662                        {
663                            let stripped = text.trim();
664                            if !stripped.is_empty() {
665                                snap.meta.versioning = Some(stripped.to_string());
666                                meta.versioning = snap.meta.versioning.clone();
667                            }
668                        }
669                        snap.subresources.insert(fname.to_string(), text);
670                    }
671                }
672
673                let objects_root = bdir.join("objects");
674                if objects_root.exists() {
675                    for okey_entry in std::fs::read_dir(&objects_root)? {
676                        let okey_entry = okey_entry?;
677                        if !okey_entry.file_type()?.is_dir() {
678                            continue;
679                        }
680                        let key_dir = okey_entry.path();
681                        let mut versioned: Vec<LoadedObject> = Vec::new();
682                        let mut key_name: Option<String> = None;
683                        for version_entry in std::fs::read_dir(&key_dir)? {
684                            let version_entry = version_entry?;
685                            let path = version_entry.path();
686                            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
687                                continue;
688                            };
689                            if !fname.ends_with(".toml") {
690                                continue;
691                            }
692                            let version_tag = &fname[..fname.len() - 5];
693                            let toml_text = std::fs::read_to_string(&path)?;
694                            let obj_meta: ObjectMeta = toml::from_str(&toml_text)
695                                .map_err(|e| StoreError::Serde(e.to_string()))?;
696                            let bin_path = key_dir.join(format!("{}.bin", version_tag));
697                            let (body, size) = if obj_meta.is_delete_marker {
698                                (BodyRef::Memory(Bytes::new()), 0u64)
699                            } else if bin_path.exists() {
700                                let sz = std::fs::metadata(&bin_path)?.len();
701                                (
702                                    BodyRef::Disk {
703                                        bucket: meta.name.clone(),
704                                        key: obj_meta.key.clone(),
705                                        version: if version_tag == "null" {
706                                            None
707                                        } else {
708                                            Some(version_tag.to_string())
709                                        },
710                                        path: bin_path,
711                                        size: sz,
712                                    },
713                                    sz,
714                                )
715                            } else {
716                                // Fail loud: the sidecar says this object has a
717                                // body but the .bin file is missing. Returning
718                                // silently would hand the caller a truncated
719                                // object and hide data loss.
720                                return Err(StoreError::Other(format!(
721                                    "missing body file: {}",
722                                    bin_path.display()
723                                )));
724                            };
725                            let _ = size;
726                            key_name.get_or_insert_with(|| obj_meta.key.clone());
727                            if version_tag == "null" && obj_meta.version_id.is_none() {
728                                snap.objects.insert(
729                                    obj_meta.key.clone(),
730                                    LoadedObject {
731                                        meta: obj_meta,
732                                        body,
733                                    },
734                                );
735                            } else {
736                                versioned.push(LoadedObject {
737                                    meta: obj_meta,
738                                    body,
739                                });
740                            }
741                        }
742                        if !versioned.is_empty() {
743                            versioned.sort_by_key(|v| v.meta.last_modified);
744                            if let Some(key) = key_name {
745                                // Reconcile snap.objects with the newest version:
746                                // a trailing delete marker hides any prior null or
747                                // live version (remove it); otherwise overwrite
748                                // with the newest live version, even if a
749                                // pre-versioning null.toml had already been
750                                // inserted during the non-versioned scan.
751                                match versioned.last() {
752                                    Some(newest) if newest.meta.is_delete_marker => {
753                                        snap.objects.remove(&key);
754                                    }
755                                    Some(newest) => {
756                                        snap.objects.insert(key.clone(), newest.clone());
757                                    }
758                                    None => {}
759                                }
760                                snap.object_versions.insert(key, versioned);
761                            }
762                        }
763                    }
764                }
765
766                let mpu_root = bdir.join("mpu");
767                if mpu_root.exists() {
768                    for upload_entry in std::fs::read_dir(&mpu_root)? {
769                        let upload_entry = upload_entry?;
770                        if !upload_entry.file_type()?.is_dir() {
771                            continue;
772                        }
773                        let upload_dir = upload_entry.path();
774                        let init_path = upload_dir.join("init.toml");
775                        if !init_path.exists() {
776                            continue;
777                        }
778                        let init_text = std::fs::read_to_string(&init_path)?;
779                        let init: MpuInit = toml::from_str(&init_text)
780                            .map_err(|e| StoreError::Serde(e.to_string()))?;
781                        let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
782                        let parts_dir = upload_dir.join("parts");
783                        if parts_dir.exists() {
784                            for part_entry in std::fs::read_dir(&parts_dir)? {
785                                let part_entry = part_entry?;
786                                let path = part_entry.path();
787                                let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
788                                    continue;
789                                };
790                                if !fname.ends_with(".toml") {
791                                    continue;
792                                }
793                                let stem = &fname[..fname.len() - 5];
794                                let Ok(part_number) = stem.parse::<u32>() else {
795                                    continue;
796                                };
797                                let toml_text = std::fs::read_to_string(&path)?;
798                                let part_meta: UploadPartMeta = toml::from_str(&toml_text)
799                                    .map_err(|e| StoreError::Serde(e.to_string()))?;
800                                let bin_path = parts_dir.join(format!("{}.bin", part_number));
801                                if !bin_path.exists() {
802                                    return Err(StoreError::Other(format!(
803                                        "missing multipart part body file: {}",
804                                        bin_path.display()
805                                    )));
806                                }
807                                let sz = std::fs::metadata(&bin_path)?.len();
808                                let body = BodyRef::Disk {
809                                    bucket: meta.name.clone(),
810                                    key: format!("__mpu__/{}", init.upload_id),
811                                    version: Some(format!("part-{}", part_number)),
812                                    path: bin_path,
813                                    size: sz,
814                                };
815                                loaded_parts.insert(
816                                    part_number,
817                                    LoadedPart {
818                                        meta: part_meta,
819                                        body,
820                                    },
821                                );
822                            }
823                        }
824                        snap.multipart_uploads.insert(
825                            init.upload_id.clone(),
826                            LoadedMpu {
827                                init,
828                                parts: loaded_parts,
829                            },
830                        );
831                    }
832                }
833
834                Ok(Some(snap))
835            })();
836            match loaded {
837                Ok(Some(snap)) => {
838                    state.buckets.insert(snap.meta.name.clone(), snap);
839                }
840                Ok(None) => {}
841                Err(e) => tracing::warn!(
842                    bucket = %bdir.display(),
843                    error = %e,
844                    "skipping unreadable S3 bucket during load"
845                ),
846            }
847        }
848        Ok(state)
849    }
850
851    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
852        let dir = self.bucket_dir(bucket);
853        std::fs::create_dir_all(&dir)?;
854        crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
855        Ok(())
856    }
857
858    fn put_bucket_subresource(
859        &self,
860        bucket: &str,
861        kind: BucketSubresource,
862        payload: &str,
863    ) -> StoreResult<()> {
864        let dir = self.bucket_dir(bucket);
865        std::fs::create_dir_all(&dir)?;
866        let path = dir.join(Self::subresource_filename(kind));
867        crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
868        Ok(())
869    }
870
871    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
872        let path = self
873            .bucket_dir(bucket)
874            .join(Self::subresource_filename(kind));
875        match std::fs::remove_file(&path) {
876            Ok(_) => Ok(()),
877            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
878            Err(e) => Err(e.into()),
879        }
880    }
881
882    fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
883        let dir = self.bucket_dir(bucket);
884        match std::fs::remove_dir_all(&dir) {
885            Ok(_) => Ok(()),
886            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
887            Err(e) => Err(e.into()),
888        }
889    }
890
891    fn put_object(
892        &self,
893        bucket: &str,
894        key: &str,
895        version: Option<&str>,
896        body: BodySource,
897        meta: &ObjectMeta,
898    ) -> StoreResult<BodyRef> {
899        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
900        std::fs::create_dir_all(&dir)?;
901
902        if meta.is_delete_marker {
903            crate::atomic::write_atomic_toml(&toml_path, meta)?;
904            return Ok(BodyRef::Memory(Bytes::new()));
905        }
906
907        let size: u64;
908        let bytes_for_cache: Option<Bytes>;
909        match body {
910            BodySource::Bytes(b) => {
911                size = b.len() as u64;
912                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
913                bytes_for_cache = Some(b);
914            }
915            BodySource::File(src) => {
916                let src_size = std::fs::metadata(&src)?.len();
917                size = src_size;
918                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
919                bytes_for_cache = None;
920            }
921            BodySource::FileCopy(src) => {
922                let src_size = std::fs::metadata(&src)?.len();
923                size = src_size;
924                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
925                bytes_for_cache = None;
926            }
927        }
928
929        crate::atomic::write_atomic_toml(&toml_path, meta)?;
930
931        let body_key = crate::cache::BodyKey::new(
932            bucket.to_string(),
933            key.to_string(),
934            version.map(|s| s.to_string()),
935        );
936        if let Some(b) = bytes_for_cache {
937            self.cache.insert(body_key, b);
938        } else {
939            self.cache.invalidate(&crate::cache::BodyKey::new(
940                bucket.to_string(),
941                key.to_string(),
942                version.map(|s| s.to_string()),
943            ));
944        }
945
946        Ok(BodyRef::Disk {
947            bucket: bucket.to_string(),
948            key: key.to_string(),
949            version: version.map(|s| s.to_string()),
950            path: bin_path,
951            size,
952        })
953    }
954
955    fn put_object_meta(
956        &self,
957        bucket: &str,
958        key: &str,
959        version: Option<&str>,
960        meta: &ObjectMeta,
961    ) -> StoreResult<()> {
962        let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
963        std::fs::create_dir_all(&dir)?;
964        crate::atomic::write_atomic_toml(&toml_path, meta)?;
965        Ok(())
966    }
967
968    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
969        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
970        for p in [&bin_path, &toml_path] {
971            match std::fs::remove_file(p) {
972                Ok(_) => {}
973                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
974                Err(e) => return Err(e.into()),
975            }
976        }
977        Self::cleanup_empty(&dir);
978
979        self.cache.invalidate(&crate::cache::BodyKey::new(
980            bucket.to_string(),
981            key.to_string(),
982            version.map(|s| s.to_string()),
983        ));
984        Ok(())
985    }
986
987    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
988        match body {
989            BodyRef::Memory(b) => Ok(b.clone()),
990            BodyRef::Disk {
991                bucket,
992                key,
993                version,
994                path,
995                size: _,
996            } => {
997                let body_key =
998                    crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
999                if let Some(bytes) = self.cache.get(&body_key) {
1000                    return Ok(bytes);
1001                }
1002                let bytes = Bytes::from(std::fs::read(path)?);
1003                self.cache.insert(body_key, bytes.clone());
1004                Ok(bytes)
1005            }
1006        }
1007    }
1008
1009    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
1010        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1011        std::fs::create_dir_all(&parts_dir)?;
1012        let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
1013        crate::atomic::write_atomic_toml(&init_path, init)?;
1014        Ok(())
1015    }
1016
1017    fn mpu_put_part(
1018        &self,
1019        bucket: &str,
1020        upload_id: &str,
1021        part_number: u32,
1022        body: BodySource,
1023        etag: &str,
1024    ) -> StoreResult<BodyRef> {
1025        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1026        std::fs::create_dir_all(&parts_dir)?;
1027        let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
1028        let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
1029
1030        let size: u64 = match body {
1031            BodySource::Bytes(b) => {
1032                let n = b.len() as u64;
1033                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
1034                let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
1035                self.cache.insert(cache_key, b);
1036                n
1037            }
1038            BodySource::File(src) => {
1039                let n = std::fs::metadata(&src)?.len();
1040                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
1041                self.cache
1042                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1043                n
1044            }
1045            BodySource::FileCopy(src) => {
1046                let n = std::fs::metadata(&src)?.len();
1047                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
1048                self.cache
1049                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1050                n
1051            }
1052        };
1053
1054        let meta = UploadPartMeta {
1055            part_number,
1056            etag: etag.to_string(),
1057            size,
1058            last_modified: Utc::now(),
1059        };
1060        crate::atomic::write_atomic_toml(&toml_path, &meta)?;
1061        // Disk-mode parts live on disk; expose them as BodyRef::Disk so
1062        // the runtime state never holds the part body in RAM.
1063        Ok(BodyRef::Disk {
1064            bucket: bucket.to_string(),
1065            key: format!("__mpu__/{upload_id}/part-{part_number:05}"),
1066            version: None,
1067            path: bin_path,
1068            size,
1069        })
1070    }
1071
1072    fn spool_dir(&self) -> Option<std::path::PathBuf> {
1073        let dir = self.root.join(".spool");
1074        // Best-effort: create lazily; the spool helper also creates if
1075        // missing. Returning the path even when create fails is safe —
1076        // the helper handles both create and the actual file open.
1077        let _ = std::fs::create_dir_all(&dir);
1078        Some(dir)
1079    }
1080
1081    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
1082        let dir = self.mpu_dir(bucket, upload_id);
1083        // Invalidate any cached part bodies for this upload.
1084        if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
1085            for entry in entries.flatten() {
1086                let path = entry.path();
1087                if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
1088                    if let Some(stem) = fname.strip_suffix(".bin") {
1089                        if let Ok(n) = stem.parse::<u32>() {
1090                            self.cache
1091                                .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1092                        }
1093                    }
1094                }
1095            }
1096        }
1097        match std::fs::remove_dir_all(&dir) {
1098            Ok(_) => Ok(()),
1099            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1100            Err(e) => Err(e.into()),
1101        }
1102    }
1103
1104    fn mpu_complete(
1105        &self,
1106        bucket: &str,
1107        upload_id: &str,
1108        final_key: &str,
1109        version: Option<&str>,
1110        meta: &ObjectMeta,
1111    ) -> StoreResult<BodyRef> {
1112        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1113        if !parts_dir.exists() {
1114            return Err(io_other(format!(
1115                "mpu_complete: no parts dir for upload {}",
1116                upload_id
1117            )));
1118        }
1119
1120        // Enumerate parts in ascending part-number order.
1121        let mut part_numbers: Vec<u32> = Vec::new();
1122        for entry in std::fs::read_dir(&parts_dir)? {
1123            let entry = entry?;
1124            let path = entry.path();
1125            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1126                continue;
1127            };
1128            if let Some(stem) = fname.strip_suffix(".toml") {
1129                if let Ok(n) = stem.parse::<u32>() {
1130                    if self.mpu_part_bin(bucket, upload_id, n).exists() {
1131                        part_numbers.push(n);
1132                    }
1133                }
1134            }
1135        }
1136        part_numbers.sort_unstable();
1137        if part_numbers.is_empty() {
1138            return Err(io_other(format!(
1139                "mpu_complete: upload {} has no parts",
1140                upload_id
1141            )));
1142        }
1143
1144        let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1145        std::fs::create_dir_all(&dir)?;
1146
1147        let total_size: u64 = if part_numbers.len() == 1 {
1148            let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1149            let sz = std::fs::metadata(&only)?.len();
1150            match std::fs::rename(&only, &bin_path) {
1151                Ok(_) => {}
1152                Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1153                    // Cross-device rename: fall back to streaming copy then remove source.
1154                    {
1155                        let mut input = std::fs::File::open(&only)?;
1156                        let tmp = {
1157                            let mut os = bin_path.as_os_str().to_owned();
1158                            os.push(".tmp");
1159                            PathBuf::from(os)
1160                        };
1161                        let mut out = std::fs::OpenOptions::new()
1162                            .write(true)
1163                            .create(true)
1164                            .truncate(true)
1165                            .open(&tmp)?;
1166                        std::io::copy(&mut input, &mut out)?;
1167                        out.sync_all()?;
1168                        std::fs::rename(&tmp, &bin_path)?;
1169                    }
1170                    let _ = std::fs::remove_file(&only);
1171                }
1172                Err(e) => return Err(e.into()),
1173            }
1174            // fsync the parent directory so the rename that links the completed
1175            // object is durable, matching the multi-part branch below. Without
1176            // it a power-loss could leave a 200-OK'd single-part MPU not durably
1177            // linked, poisoning the bucket load (bug-audit 2026-06-20, 4.4).
1178            if let Some(parent) = bin_path.parent() {
1179                if let Ok(dir_handle) = std::fs::File::open(parent) {
1180                    let _ = dir_handle.sync_all();
1181                }
1182            }
1183            sz
1184        } else {
1185            let tmp = {
1186                let mut os = bin_path.as_os_str().to_owned();
1187                os.push(".tmp");
1188                PathBuf::from(os)
1189            };
1190            let mut total: u64 = 0;
1191            {
1192                let mut out = std::fs::OpenOptions::new()
1193                    .write(true)
1194                    .create(true)
1195                    .truncate(true)
1196                    .open(&tmp)?;
1197                for n in &part_numbers {
1198                    let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1199                    let mut input = std::fs::File::open(&part_path)?;
1200                    let copied = std::io::copy(&mut input, &mut out)?;
1201                    total += copied;
1202                }
1203                out.sync_all()?;
1204            }
1205            std::fs::rename(&tmp, &bin_path)?;
1206            if let Some(parent) = bin_path.parent() {
1207                if let Ok(dir_handle) = std::fs::File::open(parent) {
1208                    let _ = dir_handle.sync_all();
1209                }
1210            }
1211            total
1212        };
1213
1214        crate::atomic::write_atomic_toml(&toml_path, meta)?;
1215
1216        // Invalidate per-part cache entries and drop the mpu dir.
1217        for n in &part_numbers {
1218            self.cache
1219                .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1220        }
1221        let mpu_dir = self.mpu_dir(bucket, upload_id);
1222        let _ = std::fs::remove_dir_all(&mpu_dir);
1223
1224        // The concatenated body is deliberately NOT re-inserted into the cache —
1225        // large multipart uploads typically exceed the single-object cap, and we
1226        // must never round-trip the assembled body through RAM. The next
1227        // open_object_body call will load through the normal cache path.
1228        self.cache.invalidate(&crate::cache::BodyKey::new(
1229            bucket.to_string(),
1230            final_key.to_string(),
1231            version.map(|s| s.to_string()),
1232        ));
1233
1234        Ok(BodyRef::Disk {
1235            bucket: bucket.to_string(),
1236            key: final_key.to_string(),
1237            version: version.map(|s| s.to_string()),
1238            path: bin_path,
1239            size: total_size,
1240        })
1241    }
1242}
1243
1244fn libc_exdev() -> i32 {
1245    18
1246}
1247
1248#[cfg(test)]
1249mod disk_tests {
1250    use super::*;
1251    use std::sync::Arc;
1252    use tempfile::TempDir;
1253
1254    fn new_store(tmp: &TempDir) -> DiskS3Store {
1255        let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1256        DiskS3Store::new(tmp.path().to_path_buf(), cache)
1257    }
1258
1259    fn new_store_with_cache(
1260        tmp: &TempDir,
1261        cap: u64,
1262    ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1263        let cache = Arc::new(crate::cache::BodyCache::new(cap));
1264        (
1265            DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1266            cache,
1267        )
1268    }
1269
1270    fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1271        ObjectMeta {
1272            key: key.to_string(),
1273            content_type: "application/octet-stream".to_string(),
1274            etag: "etag".to_string(),
1275            size,
1276            ..Default::default()
1277        }
1278    }
1279
1280    #[test]
1281    fn put_bucket_meta_roundtrip() {
1282        let tmp = TempDir::new().unwrap();
1283        let store = new_store(&tmp);
1284        let meta = BucketMeta {
1285            name: "b1".to_string(),
1286            region: "us-east-1".to_string(),
1287            versioning: Some("Enabled".to_string()),
1288            ..Default::default()
1289        };
1290        store.put_bucket_meta("b1", &meta).unwrap();
1291        let loaded = store.load().unwrap();
1292        let snap = loaded.buckets.get("b1").unwrap();
1293        assert_eq!(snap.meta.name, "b1");
1294        assert_eq!(snap.meta.region, "us-east-1");
1295        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1296    }
1297
1298    #[test]
1299    fn put_bucket_subresource_each_variant_writes_file() {
1300        let tmp = TempDir::new().unwrap();
1301        let store = new_store(&tmp);
1302        store
1303            .put_bucket_meta(
1304                "b",
1305                &BucketMeta {
1306                    name: "b".to_string(),
1307                    ..Default::default()
1308                },
1309            )
1310            .unwrap();
1311        let variants = [
1312            BucketSubresource::Tags,
1313            BucketSubresource::Lifecycle,
1314            BucketSubresource::Cors,
1315            BucketSubresource::Policy,
1316            BucketSubresource::Notification,
1317            BucketSubresource::Logging,
1318            BucketSubresource::Website,
1319            BucketSubresource::PublicAccessBlock,
1320            BucketSubresource::ObjectLock,
1321            BucketSubresource::Replication,
1322            BucketSubresource::Ownership,
1323            BucketSubresource::Inventory,
1324            BucketSubresource::Encryption,
1325            BucketSubresource::Versioning,
1326            BucketSubresource::Acl,
1327            BucketSubresource::Accelerate,
1328        ];
1329        for v in variants {
1330            store
1331                .put_bucket_subresource("b", v, "payload=true")
1332                .unwrap();
1333            let file = store
1334                .bucket_dir("b")
1335                .join(DiskS3Store::subresource_filename(v));
1336            assert!(file.exists(), "{:?}", v);
1337            assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1338        }
1339    }
1340
1341    #[test]
1342    fn delete_bucket_subresource_removes_file() {
1343        let tmp = TempDir::new().unwrap();
1344        let store = new_store(&tmp);
1345        store
1346            .put_bucket_meta(
1347                "b",
1348                &BucketMeta {
1349                    name: "b".to_string(),
1350                    ..Default::default()
1351                },
1352            )
1353            .unwrap();
1354        store
1355            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1356            .unwrap();
1357        store
1358            .delete_bucket_subresource("b", BucketSubresource::Tags)
1359            .unwrap();
1360        let file = store.bucket_dir("b").join("tags.toml");
1361        assert!(!file.exists());
1362        // idempotent
1363        store
1364            .delete_bucket_subresource("b", BucketSubresource::Tags)
1365            .unwrap();
1366    }
1367
1368    #[test]
1369    fn put_object_bytes_roundtrip() {
1370        let tmp = TempDir::new().unwrap();
1371        let store = new_store(&tmp);
1372        store
1373            .put_bucket_meta(
1374                "b",
1375                &BucketMeta {
1376                    name: "b".to_string(),
1377                    ..Default::default()
1378                },
1379            )
1380            .unwrap();
1381        let data = Bytes::from_static(b"hello world");
1382        let meta = sample_meta("k1", data.len() as u64);
1383        let body_ref = store
1384            .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1385            .unwrap();
1386        match &body_ref {
1387            BodyRef::Disk {
1388                bucket,
1389                key,
1390                size,
1391                path,
1392                ..
1393            } => {
1394                assert_eq!(bucket, "b");
1395                assert_eq!(key, "k1");
1396                assert_eq!(*size, data.len() as u64);
1397                assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1398            }
1399            _ => panic!("expected Disk"),
1400        }
1401        let loaded = store.load().unwrap();
1402        let snap = loaded.buckets.get("b").unwrap();
1403        let obj = snap.objects.get("k1").unwrap();
1404        assert_eq!(obj.meta.size, data.len() as u64);
1405    }
1406
1407    #[test]
1408    fn put_object_file_copy_source_leaves_src_in_place() {
1409        let tmp = TempDir::new().unwrap();
1410        let store = new_store(&tmp);
1411        store
1412            .put_bucket_meta(
1413                "b",
1414                &BucketMeta {
1415                    name: "b".to_string(),
1416                    ..Default::default()
1417                },
1418            )
1419            .unwrap();
1420        let src_dir = TempDir::new().unwrap();
1421        let src = src_dir.path().join("src.bin");
1422        std::fs::write(&src, b"copied-body").unwrap();
1423        let meta = sample_meta("k", 11);
1424        let bref = store
1425            .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1426            .unwrap();
1427        match bref {
1428            BodyRef::Disk { path, size, .. } => {
1429                assert_eq!(size, 11);
1430                assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1431            }
1432            _ => panic!("expected Disk bodyref"),
1433        }
1434        assert!(src.exists(), "source file must not be moved by FileCopy");
1435        assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1436    }
1437
1438    #[test]
1439    fn put_object_file_source() {
1440        let tmp = TempDir::new().unwrap();
1441        let store = new_store(&tmp);
1442        store
1443            .put_bucket_meta(
1444                "b",
1445                &BucketMeta {
1446                    name: "b".to_string(),
1447                    ..Default::default()
1448                },
1449            )
1450            .unwrap();
1451        let src = tmp.path().join("src.bin");
1452        std::fs::write(&src, b"file-body").unwrap();
1453        let meta = sample_meta("k", 9);
1454        let body_ref = store
1455            .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1456            .unwrap();
1457        let path = match body_ref {
1458            BodyRef::Disk { path, .. } => path,
1459            _ => panic!(),
1460        };
1461        assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1462    }
1463
1464    #[test]
1465    fn put_object_meta_only_keeps_bin() {
1466        let tmp = TempDir::new().unwrap();
1467        let store = new_store(&tmp);
1468        store
1469            .put_bucket_meta(
1470                "b",
1471                &BucketMeta {
1472                    name: "b".to_string(),
1473                    ..Default::default()
1474                },
1475            )
1476            .unwrap();
1477        let data = Bytes::from_static(b"abc");
1478        let mut meta = sample_meta("k", 3);
1479        store
1480            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1481            .unwrap();
1482        let (_, bin, _) = store.object_paths("b", "k", None);
1483        let before = std::fs::read(&bin).unwrap();
1484        meta.tags.insert("x".to_string(), "y".to_string());
1485        store.put_object_meta("b", "k", None, &meta).unwrap();
1486        assert_eq!(std::fs::read(&bin).unwrap(), before);
1487        let loaded = store.load().unwrap();
1488        let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1489        assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1490    }
1491
1492    #[test]
1493    fn delete_object_cleans_up_files_and_cache() {
1494        let tmp = TempDir::new().unwrap();
1495        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1496        store
1497            .put_bucket_meta(
1498                "b",
1499                &BucketMeta {
1500                    name: "b".to_string(),
1501                    ..Default::default()
1502                },
1503            )
1504            .unwrap();
1505        let data = Bytes::from_static(b"bye");
1506        let meta = sample_meta("k", 3);
1507        store
1508            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1509            .unwrap();
1510        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1511        assert!(cache.get(&body_key).is_some());
1512        store.delete_object("b", "k", None).unwrap();
1513        let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1514        assert!(!bin.exists());
1515        assert!(!toml_path.exists());
1516        assert!(!dir.exists());
1517        assert!(cache.get(&body_key).is_none());
1518    }
1519
1520    #[test]
1521    fn open_object_body_cache_hit_and_refill() {
1522        let tmp = TempDir::new().unwrap();
1523        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1524        store
1525            .put_bucket_meta(
1526                "b",
1527                &BucketMeta {
1528                    name: "b".to_string(),
1529                    ..Default::default()
1530                },
1531            )
1532            .unwrap();
1533        let data = Bytes::from_static(b"payload");
1534        let meta = sample_meta("k", data.len() as u64);
1535        let body_ref = store
1536            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1537            .unwrap();
1538        // Cache hit.
1539        let got = store.open_object_body(&body_ref).unwrap();
1540        assert_eq!(got, data);
1541        // Invalidate and re-read populates cache from disk.
1542        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1543        cache.invalidate(&body_key);
1544        assert!(cache.get(&body_key).is_none());
1545        let got = store.open_object_body(&body_ref).unwrap();
1546        assert_eq!(got, data);
1547        assert!(cache.get(&body_key).is_some());
1548    }
1549
1550    #[test]
1551    fn open_object_body_large_bypasses_cache() {
1552        let tmp = TempDir::new().unwrap();
1553        // capacity 1024 → single-object cap 512. Use 800-byte body.
1554        let (store, cache) = new_store_with_cache(&tmp, 1024);
1555        store
1556            .put_bucket_meta(
1557                "b",
1558                &BucketMeta {
1559                    name: "b".to_string(),
1560                    ..Default::default()
1561                },
1562            )
1563            .unwrap();
1564        let data = Bytes::from(vec![7u8; 800]);
1565        let meta = sample_meta("big", 800);
1566        let body_ref = store
1567            .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1568            .unwrap();
1569        let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1570        assert!(cache.get(&body_key).is_none());
1571        let got = store.open_object_body(&body_ref).unwrap();
1572        assert_eq!(got, data);
1573        // Still none — exceeds single-object cap.
1574        assert!(cache.get(&body_key).is_none());
1575    }
1576
1577    #[test]
1578    fn load_empty_dir() {
1579        let tmp = TempDir::new().unwrap();
1580        let store = new_store(&tmp);
1581        let s = store.load().unwrap();
1582        assert!(s.buckets.is_empty());
1583    }
1584
1585    #[test]
1586    fn load_skips_mpu_without_init() {
1587        let tmp = TempDir::new().unwrap();
1588        let store = new_store(&tmp);
1589        store
1590            .put_bucket_meta(
1591                "b",
1592                &BucketMeta {
1593                    name: "b".to_string(),
1594                    ..Default::default()
1595                },
1596            )
1597            .unwrap();
1598        let data = Bytes::from_static(b"x");
1599        let meta = sample_meta("k", 1);
1600        store
1601            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1602            .unwrap();
1603        // Directory with no init.toml is skipped by load.
1604        let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1605        std::fs::create_dir_all(&mpu).unwrap();
1606
1607        let loaded = store.load().unwrap();
1608        let snap = loaded.buckets.get("b").unwrap();
1609        assert_eq!(snap.objects.len(), 1);
1610        assert!(snap.multipart_uploads.is_empty());
1611    }
1612
1613    #[test]
1614    fn load_reads_bucket_subresources() {
1615        let tmp = TempDir::new().unwrap();
1616        let store = new_store(&tmp);
1617        store
1618            .put_bucket_meta(
1619                "b",
1620                &BucketMeta {
1621                    name: "b".to_string(),
1622                    ..Default::default()
1623                },
1624            )
1625            .unwrap();
1626        store
1627            .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1628            .unwrap();
1629        store
1630            .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1631            .unwrap();
1632        store
1633            .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1634            .unwrap();
1635        store
1636            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1637            .unwrap();
1638        let loaded = store.load().unwrap();
1639        let snap = loaded.buckets.get("b").unwrap();
1640        assert_eq!(
1641            snap.subresources.get("lifecycle.toml").map(String::as_str),
1642            Some("<Lifecycle/>"),
1643        );
1644        assert_eq!(
1645            snap.subresources.get("cors.toml").map(String::as_str),
1646            Some("<Cors/>"),
1647        );
1648        assert_eq!(
1649            snap.subresources.get("policy.toml").map(String::as_str),
1650            Some("{}"),
1651        );
1652        assert_eq!(
1653            snap.subresources.get("tags.toml").map(String::as_str),
1654            Some("x=1"),
1655        );
1656    }
1657
1658    #[test]
1659    fn versioned_put_load_roundtrip() {
1660        let tmp = TempDir::new().unwrap();
1661        let store = new_store(&tmp);
1662        store
1663            .put_bucket_meta(
1664                "b",
1665                &BucketMeta {
1666                    name: "b".to_string(),
1667                    versioning: Some("Enabled".to_string()),
1668                    ..Default::default()
1669                },
1670            )
1671            .unwrap();
1672        let base = chrono::Utc::now();
1673        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1674            .iter()
1675            .enumerate()
1676        {
1677            let mut m = sample_meta("k", body.len() as u64);
1678            m.version_id = Some((*vid).to_string());
1679            m.last_modified = base + chrono::Duration::seconds(i as i64);
1680            store
1681                .put_object(
1682                    "b",
1683                    "k",
1684                    Some(*vid),
1685                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1686                    &m,
1687                )
1688                .unwrap();
1689        }
1690        let loaded = store.load().unwrap();
1691        let snap = loaded.buckets.get("b").unwrap();
1692        let versions = snap.object_versions.get("k").expect("versions present");
1693        assert_eq!(versions.len(), 3);
1694        assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1695        assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1696        assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1697        for v in versions {
1698            match &v.body {
1699                BodyRef::Disk { size, .. } => assert!(*size > 0),
1700                _ => panic!("expected Disk body"),
1701            }
1702        }
1703    }
1704
1705    #[test]
1706    fn versioned_load_promotes_latest_live_to_snap_objects() {
1707        let tmp = TempDir::new().unwrap();
1708        let store = new_store(&tmp);
1709        store
1710            .put_bucket_meta(
1711                "b",
1712                &BucketMeta {
1713                    name: "b".to_string(),
1714                    versioning: Some("Enabled".to_string()),
1715                    ..Default::default()
1716                },
1717            )
1718            .unwrap();
1719        let base = chrono::Utc::now();
1720        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1721            .iter()
1722            .enumerate()
1723        {
1724            let mut m = sample_meta("k", body.len() as u64);
1725            m.version_id = Some((*vid).to_string());
1726            m.last_modified = base + chrono::Duration::seconds(i as i64);
1727            store
1728                .put_object(
1729                    "b",
1730                    "k",
1731                    Some(*vid),
1732                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1733                    &m,
1734                )
1735                .unwrap();
1736        }
1737        let loaded = store.load().unwrap();
1738        let snap = loaded.buckets.get("b").unwrap();
1739        let current = snap.objects.get("k").expect("current object promoted");
1740        assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1741    }
1742
1743    #[test]
1744    fn versioned_load_trailing_delete_marker_hides_current() {
1745        let tmp = TempDir::new().unwrap();
1746        let store = new_store(&tmp);
1747        store
1748            .put_bucket_meta(
1749                "b",
1750                &BucketMeta {
1751                    name: "b".to_string(),
1752                    versioning: Some("Enabled".to_string()),
1753                    ..Default::default()
1754                },
1755            )
1756            .unwrap();
1757        let base = chrono::Utc::now();
1758        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1759            .iter()
1760            .enumerate()
1761        {
1762            let mut m = sample_meta("k", body.len() as u64);
1763            m.version_id = Some((*vid).to_string());
1764            m.last_modified = base + chrono::Duration::seconds(i as i64);
1765            store
1766                .put_object(
1767                    "b",
1768                    "k",
1769                    Some(*vid),
1770                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1771                    &m,
1772                )
1773                .unwrap();
1774        }
1775        // Append a delete marker on top.
1776        let mut dm = sample_meta("k", 0);
1777        dm.version_id = Some("dm1".to_string());
1778        dm.is_delete_marker = true;
1779        dm.last_modified = base + chrono::Duration::seconds(10);
1780        store
1781            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1782            .unwrap();
1783        let loaded = store.load().unwrap();
1784        let snap = loaded.buckets.get("b").unwrap();
1785        assert!(
1786            !snap.objects.contains_key("k"),
1787            "trailing delete marker must hide current object",
1788        );
1789        assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1790    }
1791
1792    #[test]
1793    fn legacy_null_object_overridden_by_newer_versions() {
1794        // A pre-versioning null put followed by versioning-enabled puts must
1795        // see snap.objects reflect the newest live version, not the stale null.
1796        let tmp = TempDir::new().unwrap();
1797        let store = new_store(&tmp);
1798        store
1799            .put_bucket_meta(
1800                "b",
1801                &BucketMeta {
1802                    name: "b".to_string(),
1803                    ..Default::default()
1804                },
1805            )
1806            .unwrap();
1807        let base = chrono::Utc::now();
1808        let mut null_meta = sample_meta("k", 3);
1809        null_meta.last_modified = base;
1810        store
1811            .put_object(
1812                "b",
1813                "k",
1814                None,
1815                BodySource::Bytes(Bytes::from_static(b"old")),
1816                &null_meta,
1817            )
1818            .unwrap();
1819        // Enable versioning and put two versions, the second a delete.
1820        store
1821            .put_bucket_meta(
1822                "b",
1823                &BucketMeta {
1824                    name: "b".to_string(),
1825                    versioning: Some("Enabled".to_string()),
1826                    ..Default::default()
1827                },
1828            )
1829            .unwrap();
1830        let mut v1 = sample_meta("k", 3);
1831        v1.version_id = Some("v1".to_string());
1832        v1.last_modified = base + chrono::Duration::seconds(1);
1833        store
1834            .put_object(
1835                "b",
1836                "k",
1837                Some("v1"),
1838                BodySource::Bytes(Bytes::from_static(b"new")),
1839                &v1,
1840            )
1841            .unwrap();
1842        let mut v2 = sample_meta("k", 5);
1843        v2.version_id = Some("v2".to_string());
1844        v2.last_modified = base + chrono::Duration::seconds(2);
1845        store
1846            .put_object(
1847                "b",
1848                "k",
1849                Some("v2"),
1850                BodySource::Bytes(Bytes::from_static(b"newer")),
1851                &v2,
1852            )
1853            .unwrap();
1854        let loaded = store.load().unwrap();
1855        let snap = loaded.buckets.get("b").unwrap();
1856        let current = snap
1857            .objects
1858            .get("k")
1859            .expect("latest live version must override stale null");
1860        assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1861        assert_eq!(current.meta.size, 5);
1862    }
1863
1864    #[test]
1865    fn legacy_null_hidden_by_trailing_delete_marker() {
1866        let tmp = TempDir::new().unwrap();
1867        let store = new_store(&tmp);
1868        store
1869            .put_bucket_meta(
1870                "b",
1871                &BucketMeta {
1872                    name: "b".to_string(),
1873                    ..Default::default()
1874                },
1875            )
1876            .unwrap();
1877        let base = chrono::Utc::now();
1878        let mut null_meta = sample_meta("k", 3);
1879        null_meta.last_modified = base;
1880        store
1881            .put_object(
1882                "b",
1883                "k",
1884                None,
1885                BodySource::Bytes(Bytes::from_static(b"old")),
1886                &null_meta,
1887            )
1888            .unwrap();
1889        store
1890            .put_bucket_meta(
1891                "b",
1892                &BucketMeta {
1893                    name: "b".to_string(),
1894                    versioning: Some("Enabled".to_string()),
1895                    ..Default::default()
1896                },
1897            )
1898            .unwrap();
1899        let mut v1 = sample_meta("k", 3);
1900        v1.version_id = Some("v1".to_string());
1901        v1.last_modified = base + chrono::Duration::seconds(1);
1902        store
1903            .put_object(
1904                "b",
1905                "k",
1906                Some("v1"),
1907                BodySource::Bytes(Bytes::from_static(b"new")),
1908                &v1,
1909            )
1910            .unwrap();
1911        let mut dm = sample_meta("k", 0);
1912        dm.version_id = Some("dm1".to_string());
1913        dm.is_delete_marker = true;
1914        dm.last_modified = base + chrono::Duration::seconds(2);
1915        store
1916            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1917            .unwrap();
1918        let loaded = store.load().unwrap();
1919        let snap = loaded.buckets.get("b").unwrap();
1920        assert!(
1921            !snap.objects.contains_key("k"),
1922            "trailing delete marker must hide even a legacy null object",
1923        );
1924    }
1925
1926    #[test]
1927    fn delete_marker_roundtrip_no_body_file() {
1928        let tmp = TempDir::new().unwrap();
1929        let store = new_store(&tmp);
1930        store
1931            .put_bucket_meta(
1932                "b",
1933                &BucketMeta {
1934                    name: "b".to_string(),
1935                    versioning: Some("Enabled".to_string()),
1936                    ..Default::default()
1937                },
1938            )
1939            .unwrap();
1940        let mut m = sample_meta("k", 0);
1941        m.version_id = Some("dm1".to_string());
1942        m.is_delete_marker = true;
1943        store
1944            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1945            .unwrap();
1946        // No .bin file on disk for delete markers.
1947        let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1948        assert!(!bin.exists(), "delete marker must not have a .bin file");
1949        assert!(toml_path.exists());
1950
1951        let loaded = store.load().unwrap();
1952        let versions = loaded
1953            .buckets
1954            .get("b")
1955            .unwrap()
1956            .object_versions
1957            .get("k")
1958            .unwrap();
1959        assert_eq!(versions.len(), 1);
1960        assert!(versions[0].meta.is_delete_marker);
1961        match &versions[0].body {
1962            BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1963            _ => panic!("delete marker body should be empty Memory"),
1964        }
1965    }
1966
1967    #[test]
1968    fn mixed_nonversioned_and_versioned_buckets() {
1969        let tmp = TempDir::new().unwrap();
1970        let store = new_store(&tmp);
1971        store
1972            .put_bucket_meta(
1973                "a",
1974                &BucketMeta {
1975                    name: "a".to_string(),
1976                    ..Default::default()
1977                },
1978            )
1979            .unwrap();
1980        store
1981            .put_bucket_meta(
1982                "b",
1983                &BucketMeta {
1984                    name: "b".to_string(),
1985                    versioning: Some("Enabled".to_string()),
1986                    ..Default::default()
1987                },
1988            )
1989            .unwrap();
1990        let ma = sample_meta("only", 3);
1991        store
1992            .put_object(
1993                "a",
1994                "only",
1995                None,
1996                BodySource::Bytes(Bytes::from_static(b"aaa")),
1997                &ma,
1998            )
1999            .unwrap();
2000        let base = chrono::Utc::now();
2001        for (i, vid) in ["v1", "v2"].iter().enumerate() {
2002            let mut m = sample_meta("twice", 2);
2003            m.version_id = Some((*vid).to_string());
2004            m.last_modified = base + chrono::Duration::seconds(i as i64);
2005            store
2006                .put_object(
2007                    "b",
2008                    "twice",
2009                    Some(*vid),
2010                    BodySource::Bytes(Bytes::from_static(b"xx")),
2011                    &m,
2012                )
2013                .unwrap();
2014        }
2015        let loaded = store.load().unwrap();
2016        assert_eq!(loaded.buckets.len(), 2);
2017        let a = loaded.buckets.get("a").unwrap();
2018        assert_eq!(a.objects.len(), 1);
2019        assert!(a.object_versions.is_empty());
2020        let b = loaded.buckets.get("b").unwrap();
2021        // Fix #5: the latest live version is promoted into objects so
2022        // unversioned GETs see it post-restart.
2023        assert_eq!(
2024            b.objects.get("twice").unwrap().meta.version_id.as_deref(),
2025            Some("v2")
2026        );
2027        assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
2028    }
2029
2030    fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
2031        MpuInit {
2032            upload_id: upload_id.to_string(),
2033            key: key.to_string(),
2034            initiated: chrono::Utc::now(),
2035            content_type: "application/octet-stream".to_string(),
2036            storage_class: "STANDARD".to_string(),
2037            ..Default::default()
2038        }
2039    }
2040
2041    #[test]
2042    fn mpu_create_then_load_empty_parts() {
2043        let tmp = TempDir::new().unwrap();
2044        let store = new_store(&tmp);
2045        store
2046            .put_bucket_meta(
2047                "b",
2048                &BucketMeta {
2049                    name: "b".to_string(),
2050                    ..Default::default()
2051                },
2052            )
2053            .unwrap();
2054        let init = sample_mpu_init("up1", "k1");
2055        store.mpu_create("b", "up1", &init).unwrap();
2056        let loaded = store.load().unwrap();
2057        let snap = loaded.buckets.get("b").unwrap();
2058        let m = snap.multipart_uploads.get("up1").expect("upload present");
2059        assert_eq!(m.init.upload_id, "up1");
2060        assert_eq!(m.init.key, "k1");
2061        assert!(m.parts.is_empty());
2062    }
2063
2064    #[test]
2065    fn mpu_put_part_then_load_three_parts() {
2066        let tmp = TempDir::new().unwrap();
2067        let store = new_store(&tmp);
2068        store
2069            .put_bucket_meta(
2070                "b",
2071                &BucketMeta {
2072                    name: "b".to_string(),
2073                    ..Default::default()
2074                },
2075            )
2076            .unwrap();
2077        store
2078            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2079            .unwrap();
2080        let bodies = [
2081            Bytes::from_static(b"part-one"),
2082            Bytes::from_static(b"part-two-longer"),
2083            Bytes::from_static(b"p3"),
2084        ];
2085        for (i, body) in bodies.iter().enumerate() {
2086            let n = (i + 1) as u32;
2087            store
2088                .mpu_put_part(
2089                    "b",
2090                    "up",
2091                    n,
2092                    BodySource::Bytes(body.clone()),
2093                    &format!("et{}", n),
2094                )
2095                .unwrap();
2096        }
2097        let loaded = store.load().unwrap();
2098        let snap = loaded.buckets.get("b").unwrap();
2099        let m = snap.multipart_uploads.get("up").unwrap();
2100        assert_eq!(m.parts.len(), 3);
2101        for (i, body) in bodies.iter().enumerate() {
2102            let n = (i + 1) as u32;
2103            let part = m.parts.get(&n).unwrap();
2104            assert_eq!(part.meta.part_number, n);
2105            assert_eq!(part.meta.size, body.len() as u64);
2106            assert_eq!(part.meta.etag, format!("et{}", n));
2107            match &part.body {
2108                BodyRef::Disk { path, size, .. } => {
2109                    assert_eq!(*size, body.len() as u64);
2110                    assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2111                }
2112                _ => panic!("expected Disk"),
2113            }
2114        }
2115    }
2116
2117    #[test]
2118    fn mpu_abort_removes_upload() {
2119        let tmp = TempDir::new().unwrap();
2120        let store = new_store(&tmp);
2121        store
2122            .put_bucket_meta(
2123                "b",
2124                &BucketMeta {
2125                    name: "b".to_string(),
2126                    ..Default::default()
2127                },
2128            )
2129            .unwrap();
2130        store
2131            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2132            .unwrap();
2133        store
2134            .mpu_put_part(
2135                "b",
2136                "up",
2137                1,
2138                BodySource::Bytes(Bytes::from_static(b"x")),
2139                "e",
2140            )
2141            .unwrap();
2142        store.mpu_abort("b", "up").unwrap();
2143        assert!(!store.mpu_dir("b", "up").exists());
2144        // Idempotent.
2145        store.mpu_abort("b", "up").unwrap();
2146        let loaded = store.load().unwrap();
2147        let snap = loaded.buckets.get("b").unwrap();
2148        assert!(snap.multipart_uploads.is_empty());
2149    }
2150
2151    #[test]
2152    fn mpu_complete_single_part_fast_path() {
2153        let tmp = TempDir::new().unwrap();
2154        let store = new_store(&tmp);
2155        store
2156            .put_bucket_meta(
2157                "b",
2158                &BucketMeta {
2159                    name: "b".to_string(),
2160                    ..Default::default()
2161                },
2162            )
2163            .unwrap();
2164        store
2165            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2166            .unwrap();
2167        let body = Bytes::from_static(b"only-part-bytes");
2168        store
2169            .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2170            .unwrap();
2171        let meta = sample_meta("k", body.len() as u64);
2172        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2173        match &body_ref {
2174            BodyRef::Disk { path, size, .. } => {
2175                assert_eq!(*size, body.len() as u64);
2176                assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2177            }
2178            _ => panic!("expected Disk"),
2179        }
2180        assert!(!store.mpu_dir("b", "up").exists());
2181    }
2182
2183    #[test]
2184    fn mpu_complete_multi_part_concat() {
2185        let tmp = TempDir::new().unwrap();
2186        let store = new_store(&tmp);
2187        store
2188            .put_bucket_meta(
2189                "b",
2190                &BucketMeta {
2191                    name: "b".to_string(),
2192                    ..Default::default()
2193                },
2194            )
2195            .unwrap();
2196        store
2197            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2198            .unwrap();
2199        let p1 = Bytes::from_static(b"AAAA");
2200        let p2 = Bytes::from_static(b"BBBBBB");
2201        let p3 = Bytes::from_static(b"CC");
2202        for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2203            store
2204                .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2205                .unwrap();
2206        }
2207        let mut expected = Vec::new();
2208        expected.extend_from_slice(&p1);
2209        expected.extend_from_slice(&p2);
2210        expected.extend_from_slice(&p3);
2211        let meta = sample_meta("k", expected.len() as u64);
2212        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2213        let path = match body_ref {
2214            BodyRef::Disk { path, size, .. } => {
2215                assert_eq!(size, expected.len() as u64);
2216                path
2217            }
2218            _ => panic!("expected Disk"),
2219        };
2220        assert_eq!(std::fs::read(&path).unwrap(), expected);
2221        assert!(!store.mpu_dir("b", "up").exists());
2222    }
2223
2224    #[test]
2225    fn mpu_complete_large_streaming_via_file_source() {
2226        let tmp = TempDir::new().unwrap();
2227        let store = new_store(&tmp);
2228        store
2229            .put_bucket_meta(
2230                "b",
2231                &BucketMeta {
2232                    name: "b".to_string(),
2233                    ..Default::default()
2234                },
2235            )
2236            .unwrap();
2237        store
2238            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2239            .unwrap();
2240
2241        // 3 parts of ~1 MiB each (kept small so tests stay fast but still
2242        // exercise the BodySource::File + streaming concat path).
2243        const PART_SIZE: usize = 1024 * 1024;
2244        let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2245        let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2246        for (i, byte) in patterns.iter().enumerate() {
2247            let src = tmp.path().join(format!("src-{}.bin", i + 1));
2248            let data = vec![*byte; PART_SIZE];
2249            std::fs::write(&src, &data).unwrap();
2250            expected.extend_from_slice(&data);
2251            store
2252                .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2253                .unwrap();
2254        }
2255        let meta = sample_meta("k", expected.len() as u64);
2256        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2257        let path = match body_ref {
2258            BodyRef::Disk { path, size, .. } => {
2259                assert_eq!(size, expected.len() as u64);
2260                path
2261            }
2262            _ => panic!("expected Disk"),
2263        };
2264        let actual = std::fs::read(&path).unwrap();
2265        assert_eq!(actual.len(), expected.len());
2266        assert_eq!(actual, expected);
2267        assert!(!store.mpu_dir("b", "up").exists());
2268    }
2269
2270    #[test]
2271    fn mpu_resumable_across_load() {
2272        let tmp = TempDir::new().unwrap();
2273        let store = new_store(&tmp);
2274        store
2275            .put_bucket_meta(
2276                "b",
2277                &BucketMeta {
2278                    name: "b".to_string(),
2279                    ..Default::default()
2280                },
2281            )
2282            .unwrap();
2283        store
2284            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2285            .unwrap();
2286        let p1 = Bytes::from_static(b"hello-");
2287        let p2 = Bytes::from_static(b"world-");
2288        store
2289            .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2290            .unwrap();
2291        store
2292            .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2293            .unwrap();
2294
2295        // Simulate a restart: re-open the store on the same dir and load.
2296        let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2297        let loaded = store2.load().unwrap();
2298        let snap = loaded.buckets.get("b").unwrap();
2299        let m = snap.multipart_uploads.get("up").unwrap();
2300        assert_eq!(m.parts.len(), 2);
2301        assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2302        assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2303
2304        // Continue the upload on the fresh store.
2305        let p3 = Bytes::from_static(b"again!");
2306        store2
2307            .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2308            .unwrap();
2309        let mut expected = Vec::new();
2310        expected.extend_from_slice(&p1);
2311        expected.extend_from_slice(&p2);
2312        expected.extend_from_slice(&p3);
2313        let meta = sample_meta("k", expected.len() as u64);
2314        let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2315        let path = match body_ref {
2316            BodyRef::Disk { path, .. } => path,
2317            _ => panic!(),
2318        };
2319        assert_eq!(std::fs::read(&path).unwrap(), expected);
2320        assert!(!store2.mpu_dir("b", "up").exists());
2321    }
2322
2323    #[test]
2324    fn tags_snapshot_roundtrip_via_store() {
2325        let tmp = TempDir::new().unwrap();
2326        let store = new_store(&tmp);
2327        store
2328            .put_bucket_meta(
2329                "b",
2330                &BucketMeta {
2331                    name: "b".to_string(),
2332                    ..Default::default()
2333                },
2334            )
2335            .unwrap();
2336        let mut tags = BTreeMap::new();
2337        tags.insert("env".to_string(), "prod".to_string());
2338        tags.insert("team".to_string(), "s3".to_string());
2339        let snap = TagsSnapshot { tags: tags.clone() };
2340        let payload = toml::to_string(&snap).unwrap();
2341        store
2342            .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2343            .unwrap();
2344        let loaded = store.load().unwrap();
2345        let text = loaded
2346            .buckets
2347            .get("b")
2348            .unwrap()
2349            .subresources
2350            .get("tags.toml")
2351            .cloned()
2352            .unwrap();
2353        let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2354        assert_eq!(decoded.tags, tags);
2355    }
2356
2357    #[test]
2358    fn acl_snapshot_roundtrip_via_store() {
2359        let tmp = TempDir::new().unwrap();
2360        let store = new_store(&tmp);
2361        store
2362            .put_bucket_meta(
2363                "b",
2364                &BucketMeta {
2365                    name: "b".to_string(),
2366                    ..Default::default()
2367                },
2368            )
2369            .unwrap();
2370        let snap = AclSnapshot {
2371            owner_id: "owner-abc".to_string(),
2372            grants: vec![
2373                AclGrantSnapshot {
2374                    grantee_type: "CanonicalUser".to_string(),
2375                    grantee_id: Some("owner-abc".to_string()),
2376                    grantee_display_name: Some("owner".to_string()),
2377                    grantee_uri: None,
2378                    permission: "FULL_CONTROL".to_string(),
2379                },
2380                AclGrantSnapshot {
2381                    grantee_type: "Group".to_string(),
2382                    grantee_id: None,
2383                    grantee_display_name: None,
2384                    grantee_uri: Some(
2385                        "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2386                    ),
2387                    permission: "READ".to_string(),
2388                },
2389            ],
2390        };
2391        let payload = toml::to_string(&snap).unwrap();
2392        store
2393            .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2394            .unwrap();
2395        let loaded = store.load().unwrap();
2396        let text = loaded
2397            .buckets
2398            .get("b")
2399            .unwrap()
2400            .subresources
2401            .get("acl.toml")
2402            .cloned()
2403            .unwrap();
2404        let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2405        assert_eq!(decoded.owner_id, "owner-abc");
2406        assert_eq!(decoded.grants.len(), 2);
2407        assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2408        assert_eq!(decoded.grants[1].grantee_type, "Group");
2409    }
2410
2411    #[test]
2412    fn inventory_snapshot_roundtrip_via_store() {
2413        let tmp = TempDir::new().unwrap();
2414        let store = new_store(&tmp);
2415        store
2416            .put_bucket_meta(
2417                "b",
2418                &BucketMeta {
2419                    name: "b".to_string(),
2420                    ..Default::default()
2421                },
2422            )
2423            .unwrap();
2424        let mut configs = BTreeMap::new();
2425        configs.insert(
2426            "inv-1".to_string(),
2427            "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2428        );
2429        configs.insert(
2430            "inv-2".to_string(),
2431            "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2432        );
2433        let snap = InventorySnapshot {
2434            configs: configs.clone(),
2435        };
2436        let payload = toml::to_string(&snap).unwrap();
2437        store
2438            .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2439            .unwrap();
2440        let loaded = store.load().unwrap();
2441        let text = loaded
2442            .buckets
2443            .get("b")
2444            .unwrap()
2445            .subresources
2446            .get("inventory.toml")
2447            .cloned()
2448            .unwrap();
2449        let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2450        assert_eq!(decoded.configs, configs);
2451    }
2452
2453    #[test]
2454    fn legacy_versioning_file_is_read() {
2455        let tmp = TempDir::new().unwrap();
2456        let store = new_store(&tmp);
2457        // Bucket meta with no versioning field set.
2458        store
2459            .put_bucket_meta(
2460                "b",
2461                &BucketMeta {
2462                    name: "b".to_string(),
2463                    ..Default::default()
2464                },
2465            )
2466            .unwrap();
2467        // Legacy sidecar: a bare versioning.toml with "Enabled".
2468        let path = store.bucket_dir("b").join("versioning.toml");
2469        std::fs::write(&path, "Enabled").unwrap();
2470        let loaded = store.load().unwrap();
2471        let snap = loaded.buckets.get("b").unwrap();
2472        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2473    }
2474
2475    #[test]
2476    fn load_skips_corrupt_bucket_and_keeps_others() {
2477        // One corrupt bucket must be skipped with a warning, not abort the
2478        // whole store load (bug-audit 2026-06-20, 4.3).
2479        let tmp = TempDir::new().unwrap();
2480        let store = new_store(&tmp);
2481        store
2482            .put_bucket_meta(
2483                "good",
2484                &BucketMeta {
2485                    name: "good".to_string(),
2486                    ..Default::default()
2487                },
2488            )
2489            .unwrap();
2490        store
2491            .put_object(
2492                "good",
2493                "k",
2494                None,
2495                BodySource::Bytes(Bytes::from_static(b"hi")),
2496                &sample_meta("k", 2),
2497            )
2498            .unwrap();
2499
2500        let bad_dir = store.buckets_dir().join("bad");
2501        std::fs::create_dir_all(&bad_dir).unwrap();
2502        std::fs::write(bad_dir.join("meta.toml"), b"not valid toml = = =").unwrap();
2503
2504        let loaded = store.load().unwrap();
2505        assert!(
2506            loaded.buckets.contains_key("good"),
2507            "valid bucket must load"
2508        );
2509        assert!(
2510            !loaded.buckets.contains_key("bad"),
2511            "corrupt bucket must be skipped, not abort the load"
2512        );
2513    }
2514}