Skip to main content

fakecloud_persistence/
s3.rs

1use std::collections::{BTreeMap, HashMap};
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11    #[serde(default)]
12    pub account_id: String,
13    #[serde(default)]
14    pub region: String,
15    #[serde(default)]
16    pub buckets: HashMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21    pub meta: BucketMeta,
22    #[serde(default)]
23    pub objects: HashMap<String, LoadedObject>,
24    #[serde(default)]
25    pub object_versions: HashMap<String, Vec<LoadedObject>>,
26    #[serde(default)]
27    pub subresources: HashMap<String, String>,
28    #[serde(default)]
29    pub multipart_uploads: HashMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34    pub init: MpuInit,
35    #[serde(default)]
36    pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41    pub meta: UploadPartMeta,
42    #[serde(default)]
43    pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48    pub meta: ObjectMeta,
49    #[serde(default)]
50    pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55    #[serde(default)]
56    pub name: String,
57    #[serde(default = "default_time")]
58    pub creation_date: DateTime<Utc>,
59    #[serde(default)]
60    pub region: String,
61    #[serde(default)]
62    pub versioning: Option<String>,
63    #[serde(default)]
64    pub acl: Option<String>,
65    #[serde(default)]
66    pub acl_owner_id: String,
67    #[serde(default)]
68    pub accelerate_status: Option<String>,
69    #[serde(default)]
70    pub eventbridge_enabled: bool,
71}
72
73fn default_time() -> DateTime<Utc> {
74    DateTime::<Utc>::MIN_UTC
75}
76
77#[derive(Debug, Default, Clone, Serialize, Deserialize)]
78pub struct AclGrantSnapshot {
79    pub grantee_type: String,
80    #[serde(default)]
81    pub grantee_id: Option<String>,
82    #[serde(default)]
83    pub grantee_display_name: Option<String>,
84    #[serde(default)]
85    pub grantee_uri: Option<String>,
86    pub permission: String,
87}
88
89#[derive(Debug, Default, Clone, Serialize, Deserialize)]
90pub struct TagsSnapshot {
91    #[serde(default)]
92    pub tags: HashMap<String, String>,
93}
94
95#[derive(Debug, Default, Clone, Serialize, Deserialize)]
96pub struct AclSnapshot {
97    #[serde(default)]
98    pub owner_id: String,
99    #[serde(default)]
100    pub grants: Vec<AclGrantSnapshot>,
101}
102
103#[derive(Debug, Default, Clone, Serialize, Deserialize)]
104pub struct InventorySnapshot {
105    #[serde(default)]
106    pub configs: HashMap<String, String>,
107}
108
109#[derive(Debug, Default, Clone, Serialize, Deserialize)]
110pub struct ObjectMeta {
111    #[serde(default)]
112    pub key: String,
113    #[serde(default)]
114    pub content_type: String,
115    #[serde(default)]
116    pub etag: String,
117    #[serde(default)]
118    pub size: u64,
119    #[serde(default = "default_time")]
120    pub last_modified: DateTime<Utc>,
121    #[serde(default)]
122    pub metadata: HashMap<String, String>,
123    #[serde(default)]
124    pub tags: HashMap<String, String>,
125    #[serde(default)]
126    pub storage_class: String,
127    #[serde(default)]
128    pub acl_grants: Vec<AclGrantSnapshot>,
129    #[serde(default)]
130    pub acl_owner_id: Option<String>,
131    #[serde(default)]
132    pub parts_count: Option<u32>,
133    #[serde(default)]
134    pub part_sizes: Option<Vec<(u32, u64)>>,
135    #[serde(default)]
136    pub sse_algorithm: Option<String>,
137    #[serde(default)]
138    pub sse_kms_key_id: Option<String>,
139    #[serde(default)]
140    pub bucket_key_enabled: Option<bool>,
141    #[serde(default)]
142    pub version_id: Option<String>,
143    #[serde(default)]
144    pub is_delete_marker: bool,
145    #[serde(default)]
146    pub restore_ongoing: Option<bool>,
147    #[serde(default)]
148    pub restore_expiry: Option<String>,
149    #[serde(default)]
150    pub checksum_algorithm: Option<String>,
151    #[serde(default)]
152    pub checksum_value: Option<String>,
153    #[serde(default)]
154    pub lock_mode: Option<String>,
155    #[serde(default)]
156    pub lock_retain_until: Option<DateTime<Utc>>,
157    #[serde(default)]
158    pub lock_legal_hold: Option<String>,
159    #[serde(default)]
160    pub content_encoding: Option<String>,
161    #[serde(default)]
162    pub website_redirect_location: Option<String>,
163}
164
165#[derive(Debug, Default, Clone, Serialize, Deserialize)]
166pub struct MpuInit {
167    pub upload_id: String,
168    pub key: String,
169    #[serde(default = "default_time")]
170    pub initiated: DateTime<Utc>,
171    #[serde(default)]
172    pub metadata: HashMap<String, String>,
173    #[serde(default)]
174    pub content_type: String,
175    #[serde(default)]
176    pub storage_class: String,
177    #[serde(default)]
178    pub sse_algorithm: Option<String>,
179    #[serde(default)]
180    pub sse_kms_key_id: Option<String>,
181    #[serde(default)]
182    pub tagging: Option<String>,
183    #[serde(default)]
184    pub acl_grants: Vec<AclGrantSnapshot>,
185    #[serde(default)]
186    pub checksum_algorithm: Option<String>,
187}
188
189#[derive(Debug, Default, Clone, Serialize, Deserialize)]
190pub struct UploadPartMeta {
191    pub part_number: u32,
192    pub etag: String,
193    pub size: u64,
194    #[serde(default = "default_time")]
195    pub last_modified: DateTime<Utc>,
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub enum BucketSubresource {
200    Tags,
201    Lifecycle,
202    Cors,
203    Policy,
204    Notification,
205    Logging,
206    Website,
207    PublicAccessBlock,
208    ObjectLock,
209    Replication,
210    Ownership,
211    Inventory,
212    Encryption,
213    Versioning,
214    Acl,
215    Accelerate,
216    Analytics,
217    IntelligentTiering,
218    Metrics,
219    RequestPayment,
220    Abac,
221    MetadataConfiguration,
222    MetadataTableConfiguration,
223}
224
225pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
226    BucketSubresource::Tags,
227    BucketSubresource::Lifecycle,
228    BucketSubresource::Cors,
229    BucketSubresource::Policy,
230    BucketSubresource::Notification,
231    BucketSubresource::Logging,
232    BucketSubresource::Website,
233    BucketSubresource::PublicAccessBlock,
234    BucketSubresource::ObjectLock,
235    BucketSubresource::Replication,
236    BucketSubresource::Ownership,
237    BucketSubresource::Inventory,
238    BucketSubresource::Encryption,
239    BucketSubresource::Versioning,
240    BucketSubresource::Acl,
241    BucketSubresource::Accelerate,
242    BucketSubresource::Analytics,
243    BucketSubresource::IntelligentTiering,
244    BucketSubresource::Metrics,
245    BucketSubresource::RequestPayment,
246    BucketSubresource::Abac,
247    BucketSubresource::MetadataConfiguration,
248    BucketSubresource::MetadataTableConfiguration,
249];
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub enum BodyRef {
253    #[serde(skip)]
254    Memory(Bytes),
255    Disk {
256        bucket: String,
257        key: String,
258        #[serde(default)]
259        version: Option<String>,
260        path: PathBuf,
261        size: u64,
262    },
263}
264
265impl BodyRef {
266    pub fn size(&self) -> u64 {
267        match self {
268            BodyRef::Memory(b) => b.len() as u64,
269            BodyRef::Disk { size, .. } => *size,
270        }
271    }
272}
273
274impl Default for BodyRef {
275    fn default() -> Self {
276        BodyRef::Memory(Bytes::new())
277    }
278}
279
280#[derive(Debug)]
281pub enum BodySource {
282    Bytes(Bytes),
283    /// Existing disk path that should be *moved* into the destination via
284    /// rename (upload-tmp → final) and consumed.
285    File(PathBuf),
286    /// Existing disk path that should be *copied* into the destination and
287    /// left in place. Used by replication so the source object stays
288    /// available while the replica is produced.
289    FileCopy(PathBuf),
290}
291
292#[derive(Debug, Error)]
293pub enum StoreError {
294    #[error("io error: {0}")]
295    Io(#[from] std::io::Error),
296    #[error("serialization error: {0}")]
297    Serde(String),
298    #[error("not supported by this store")]
299    NotSupported,
300    #[error("{0}")]
301    Other(String),
302}
303
304pub type StoreResult<T> = Result<T, StoreError>;
305
306pub trait S3Store: Send + Sync {
307    fn load(&self) -> StoreResult<S3State>;
308
309    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
310    fn put_bucket_subresource(
311        &self,
312        bucket: &str,
313        kind: BucketSubresource,
314        payload: &str,
315    ) -> StoreResult<()>;
316    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
317    fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
318
319    fn put_object(
320        &self,
321        bucket: &str,
322        key: &str,
323        version: Option<&str>,
324        body: BodySource,
325        meta: &ObjectMeta,
326    ) -> StoreResult<BodyRef>;
327    fn put_object_meta(
328        &self,
329        bucket: &str,
330        key: &str,
331        version: Option<&str>,
332        meta: &ObjectMeta,
333    ) -> StoreResult<()>;
334    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
335    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
336
337    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
338    fn mpu_put_part(
339        &self,
340        bucket: &str,
341        upload_id: &str,
342        part_number: u32,
343        body: BodySource,
344        etag: &str,
345    ) -> StoreResult<()>;
346    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
347    fn mpu_complete(
348        &self,
349        bucket: &str,
350        upload_id: &str,
351        final_key: &str,
352        version: Option<&str>,
353        meta: &ObjectMeta,
354    ) -> StoreResult<BodyRef>;
355}
356
357pub struct MemoryS3Store;
358
359impl MemoryS3Store {
360    pub fn new() -> Self {
361        Self
362    }
363}
364
365impl Default for MemoryS3Store {
366    fn default() -> Self {
367        Self::new()
368    }
369}
370
371impl S3Store for MemoryS3Store {
372    fn load(&self) -> StoreResult<S3State> {
373        Ok(S3State::default())
374    }
375
376    fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
377        Ok(())
378    }
379    fn put_bucket_subresource(
380        &self,
381        _bucket: &str,
382        _kind: BucketSubresource,
383        _payload: &str,
384    ) -> StoreResult<()> {
385        Ok(())
386    }
387    fn delete_bucket_subresource(
388        &self,
389        _bucket: &str,
390        _kind: BucketSubresource,
391    ) -> StoreResult<()> {
392        Ok(())
393    }
394    fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
395        Ok(())
396    }
397
398    fn put_object(
399        &self,
400        _bucket: &str,
401        _key: &str,
402        _version: Option<&str>,
403        body: BodySource,
404        _meta: &ObjectMeta,
405    ) -> StoreResult<BodyRef> {
406        match body {
407            BodySource::Bytes(b) => Ok(BodyRef::Memory(b)),
408            BodySource::File(_) | BodySource::FileCopy(_) => Err(StoreError::Other(
409                "file-backed put not supported in memory mode".to_string(),
410            )),
411        }
412    }
413    fn put_object_meta(
414        &self,
415        _bucket: &str,
416        _key: &str,
417        _version: Option<&str>,
418        _meta: &ObjectMeta,
419    ) -> StoreResult<()> {
420        Ok(())
421    }
422    fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
423        Ok(())
424    }
425    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
426        match body {
427            BodyRef::Memory(b) => Ok(b.clone()),
428            BodyRef::Disk { .. } => {
429                panic!("MemoryS3Store cannot open Disk-backed BodyRef")
430            }
431        }
432    }
433
434    fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
435        Ok(())
436    }
437    fn mpu_put_part(
438        &self,
439        _bucket: &str,
440        _upload_id: &str,
441        _part_number: u32,
442        _body: BodySource,
443        _etag: &str,
444    ) -> StoreResult<()> {
445        Ok(())
446    }
447    fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
448        Ok(())
449    }
450    fn mpu_complete(
451        &self,
452        _bucket: &str,
453        _upload_id: &str,
454        _final_key: &str,
455        _version: Option<&str>,
456        _meta: &ObjectMeta,
457    ) -> StoreResult<BodyRef> {
458        Ok(BodyRef::Memory(Bytes::new()))
459    }
460}
461
462pub struct DiskS3Store {
463    root: PathBuf,
464    cache: std::sync::Arc<crate::cache::BodyCache>,
465}
466
467impl DiskS3Store {
468    pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
469        Self { root, cache }
470    }
471
472    fn buckets_dir(&self) -> PathBuf {
473        self.root.join("buckets")
474    }
475
476    fn bucket_dir(&self, bucket: &str) -> PathBuf {
477        self.buckets_dir()
478            .join(crate::key_escape::escape_key_segment(bucket))
479    }
480
481    fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
482        self.bucket_dir(bucket)
483            .join("objects")
484            .join(crate::key_escape::escape_key_segment(key))
485    }
486
487    fn version_tag(version: Option<&str>) -> String {
488        version.unwrap_or("null").to_string()
489    }
490
491    fn object_paths(
492        &self,
493        bucket: &str,
494        key: &str,
495        version: Option<&str>,
496    ) -> (PathBuf, PathBuf, PathBuf) {
497        let dir = self.object_dir(bucket, key);
498        let tag = Self::version_tag(version);
499        let bin = dir.join(format!("{}.bin", tag));
500        let toml = dir.join(format!("{}.toml", tag));
501        (dir, bin, toml)
502    }
503
504    fn subresource_filename(kind: BucketSubresource) -> &'static str {
505        match kind {
506            BucketSubresource::Tags => "tags.toml",
507            BucketSubresource::Lifecycle => "lifecycle.toml",
508            BucketSubresource::Cors => "cors.toml",
509            BucketSubresource::Policy => "policy.toml",
510            BucketSubresource::Notification => "notification.toml",
511            BucketSubresource::Logging => "logging.toml",
512            BucketSubresource::Website => "website.toml",
513            BucketSubresource::PublicAccessBlock => "public_access_block.toml",
514            BucketSubresource::ObjectLock => "object_lock.toml",
515            BucketSubresource::Replication => "replication.toml",
516            BucketSubresource::Ownership => "ownership.toml",
517            BucketSubresource::Inventory => "inventory.toml",
518            BucketSubresource::Encryption => "encryption.toml",
519            BucketSubresource::Versioning => "versioning.toml",
520            BucketSubresource::Acl => "acl.toml",
521            BucketSubresource::Accelerate => "accelerate.toml",
522            BucketSubresource::Analytics => "analytics.toml",
523            BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
524            BucketSubresource::Metrics => "metrics.toml",
525            BucketSubresource::RequestPayment => "request_payment.toml",
526            BucketSubresource::Abac => "abac.toml",
527            BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
528            BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
529        }
530    }
531
532    fn cleanup_empty(dir: &std::path::Path) {
533        let _ = std::fs::remove_dir(dir);
534    }
535
536    fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
537        self.bucket_dir(bucket)
538            .join("mpu")
539            .join(crate::key_escape::escape_key_segment(upload_id))
540    }
541
542    fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
543        self.mpu_dir(bucket, upload_id).join("parts")
544    }
545
546    fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
547        self.mpu_parts_dir(bucket, upload_id)
548            .join(format!("{}.bin", part_number))
549    }
550
551    fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
552        self.mpu_parts_dir(bucket, upload_id)
553            .join(format!("{}.toml", part_number))
554    }
555
556    fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
557        crate::cache::BodyKey::new(
558            bucket.to_string(),
559            format!("__mpu__/{}", upload_id),
560            Some(format!("part-{}", part_number)),
561        )
562    }
563}
564
565fn io_other(msg: impl Into<String>) -> StoreError {
566    StoreError::Other(msg.into())
567}
568
569impl S3Store for DiskS3Store {
570    fn load(&self) -> StoreResult<S3State> {
571        let mut state = S3State::default();
572        let buckets_dir = self.buckets_dir();
573        if !buckets_dir.exists() {
574            return Ok(state);
575        }
576        for entry in std::fs::read_dir(&buckets_dir)? {
577            let entry = entry?;
578            if !entry.file_type()?.is_dir() {
579                continue;
580            }
581            let bdir = entry.path();
582            let meta_path = bdir.join("meta.toml");
583            if !meta_path.exists() {
584                continue;
585            }
586            let meta_text = std::fs::read_to_string(&meta_path)?;
587            let mut meta: BucketMeta =
588                toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
589            let mut snap = BucketSnapshot {
590                meta: meta.clone(),
591                objects: HashMap::new(),
592                object_versions: HashMap::new(),
593                subresources: HashMap::new(),
594                multipart_uploads: HashMap::new(),
595            };
596
597            for kind in ALL_SUBRESOURCES {
598                let fname = Self::subresource_filename(*kind);
599                let path = bdir.join(fname);
600                if path.exists() {
601                    let text = std::fs::read_to_string(&path)?;
602                    if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none() {
603                        let stripped = text.trim();
604                        if !stripped.is_empty() {
605                            snap.meta.versioning = Some(stripped.to_string());
606                            meta.versioning = snap.meta.versioning.clone();
607                        }
608                    }
609                    snap.subresources.insert(fname.to_string(), text);
610                }
611            }
612
613            let objects_root = bdir.join("objects");
614            if objects_root.exists() {
615                for okey_entry in std::fs::read_dir(&objects_root)? {
616                    let okey_entry = okey_entry?;
617                    if !okey_entry.file_type()?.is_dir() {
618                        continue;
619                    }
620                    let key_dir = okey_entry.path();
621                    let mut versioned: Vec<LoadedObject> = Vec::new();
622                    let mut key_name: Option<String> = None;
623                    for version_entry in std::fs::read_dir(&key_dir)? {
624                        let version_entry = version_entry?;
625                        let path = version_entry.path();
626                        let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
627                            continue;
628                        };
629                        if !fname.ends_with(".toml") {
630                            continue;
631                        }
632                        let version_tag = &fname[..fname.len() - 5];
633                        let toml_text = std::fs::read_to_string(&path)?;
634                        let obj_meta: ObjectMeta = toml::from_str(&toml_text)
635                            .map_err(|e| StoreError::Serde(e.to_string()))?;
636                        let bin_path = key_dir.join(format!("{}.bin", version_tag));
637                        let (body, size) = if obj_meta.is_delete_marker {
638                            (BodyRef::Memory(Bytes::new()), 0u64)
639                        } else if bin_path.exists() {
640                            let sz = std::fs::metadata(&bin_path)?.len();
641                            (
642                                BodyRef::Disk {
643                                    bucket: meta.name.clone(),
644                                    key: obj_meta.key.clone(),
645                                    version: if version_tag == "null" {
646                                        None
647                                    } else {
648                                        Some(version_tag.to_string())
649                                    },
650                                    path: bin_path,
651                                    size: sz,
652                                },
653                                sz,
654                            )
655                        } else {
656                            // Fail loud: the sidecar says this object has a
657                            // body but the .bin file is missing. Returning
658                            // silently would hand the caller a truncated
659                            // object and hide data loss.
660                            return Err(StoreError::Other(format!(
661                                "missing body file: {}",
662                                bin_path.display()
663                            )));
664                        };
665                        let _ = size;
666                        key_name.get_or_insert_with(|| obj_meta.key.clone());
667                        if version_tag == "null" && obj_meta.version_id.is_none() {
668                            snap.objects.insert(
669                                obj_meta.key.clone(),
670                                LoadedObject {
671                                    meta: obj_meta,
672                                    body,
673                                },
674                            );
675                        } else {
676                            versioned.push(LoadedObject {
677                                meta: obj_meta,
678                                body,
679                            });
680                        }
681                    }
682                    if !versioned.is_empty() {
683                        versioned.sort_by_key(|v| v.meta.last_modified);
684                        if let Some(key) = key_name {
685                            // Reconcile snap.objects with the newest version:
686                            // a trailing delete marker hides any prior null or
687                            // live version (remove it); otherwise overwrite
688                            // with the newest live version, even if a
689                            // pre-versioning null.toml had already been
690                            // inserted during the non-versioned scan.
691                            match versioned.last() {
692                                Some(newest) if newest.meta.is_delete_marker => {
693                                    snap.objects.remove(&key);
694                                }
695                                Some(newest) => {
696                                    snap.objects.insert(key.clone(), newest.clone());
697                                }
698                                None => {}
699                            }
700                            snap.object_versions.insert(key, versioned);
701                        }
702                    }
703                }
704            }
705
706            let mpu_root = bdir.join("mpu");
707            if mpu_root.exists() {
708                for upload_entry in std::fs::read_dir(&mpu_root)? {
709                    let upload_entry = upload_entry?;
710                    if !upload_entry.file_type()?.is_dir() {
711                        continue;
712                    }
713                    let upload_dir = upload_entry.path();
714                    let init_path = upload_dir.join("init.toml");
715                    if !init_path.exists() {
716                        continue;
717                    }
718                    let init_text = std::fs::read_to_string(&init_path)?;
719                    let init: MpuInit =
720                        toml::from_str(&init_text).map_err(|e| StoreError::Serde(e.to_string()))?;
721                    let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
722                    let parts_dir = upload_dir.join("parts");
723                    if parts_dir.exists() {
724                        for part_entry in std::fs::read_dir(&parts_dir)? {
725                            let part_entry = part_entry?;
726                            let path = part_entry.path();
727                            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
728                                continue;
729                            };
730                            if !fname.ends_with(".toml") {
731                                continue;
732                            }
733                            let stem = &fname[..fname.len() - 5];
734                            let Ok(part_number) = stem.parse::<u32>() else {
735                                continue;
736                            };
737                            let toml_text = std::fs::read_to_string(&path)?;
738                            let part_meta: UploadPartMeta = toml::from_str(&toml_text)
739                                .map_err(|e| StoreError::Serde(e.to_string()))?;
740                            let bin_path = parts_dir.join(format!("{}.bin", part_number));
741                            if !bin_path.exists() {
742                                return Err(StoreError::Other(format!(
743                                    "missing multipart part body file: {}",
744                                    bin_path.display()
745                                )));
746                            }
747                            let sz = std::fs::metadata(&bin_path)?.len();
748                            let body = BodyRef::Disk {
749                                bucket: meta.name.clone(),
750                                key: format!("__mpu__/{}", init.upload_id),
751                                version: Some(format!("part-{}", part_number)),
752                                path: bin_path,
753                                size: sz,
754                            };
755                            loaded_parts.insert(
756                                part_number,
757                                LoadedPart {
758                                    meta: part_meta,
759                                    body,
760                                },
761                            );
762                        }
763                    }
764                    snap.multipart_uploads.insert(
765                        init.upload_id.clone(),
766                        LoadedMpu {
767                            init,
768                            parts: loaded_parts,
769                        },
770                    );
771                }
772            }
773
774            state.buckets.insert(meta.name.clone(), snap);
775        }
776        Ok(state)
777    }
778
779    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
780        let dir = self.bucket_dir(bucket);
781        std::fs::create_dir_all(&dir)?;
782        crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
783        Ok(())
784    }
785
786    fn put_bucket_subresource(
787        &self,
788        bucket: &str,
789        kind: BucketSubresource,
790        payload: &str,
791    ) -> StoreResult<()> {
792        let dir = self.bucket_dir(bucket);
793        std::fs::create_dir_all(&dir)?;
794        let path = dir.join(Self::subresource_filename(kind));
795        crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
796        Ok(())
797    }
798
799    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
800        let path = self
801            .bucket_dir(bucket)
802            .join(Self::subresource_filename(kind));
803        match std::fs::remove_file(&path) {
804            Ok(_) => Ok(()),
805            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
806            Err(e) => Err(e.into()),
807        }
808    }
809
810    fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
811        let dir = self.bucket_dir(bucket);
812        match std::fs::remove_dir_all(&dir) {
813            Ok(_) => Ok(()),
814            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
815            Err(e) => Err(e.into()),
816        }
817    }
818
819    fn put_object(
820        &self,
821        bucket: &str,
822        key: &str,
823        version: Option<&str>,
824        body: BodySource,
825        meta: &ObjectMeta,
826    ) -> StoreResult<BodyRef> {
827        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
828        std::fs::create_dir_all(&dir)?;
829
830        if meta.is_delete_marker {
831            crate::atomic::write_atomic_toml(&toml_path, meta)?;
832            return Ok(BodyRef::Memory(Bytes::new()));
833        }
834
835        let size: u64;
836        let bytes_for_cache: Option<Bytes>;
837        match body {
838            BodySource::Bytes(b) => {
839                size = b.len() as u64;
840                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
841                bytes_for_cache = Some(b);
842            }
843            BodySource::File(src) => {
844                let src_size = std::fs::metadata(&src)?.len();
845                size = src_size;
846                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
847                bytes_for_cache = None;
848            }
849            BodySource::FileCopy(src) => {
850                let src_size = std::fs::metadata(&src)?.len();
851                size = src_size;
852                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
853                bytes_for_cache = None;
854            }
855        }
856
857        crate::atomic::write_atomic_toml(&toml_path, meta)?;
858
859        let body_key = crate::cache::BodyKey::new(
860            bucket.to_string(),
861            key.to_string(),
862            version.map(|s| s.to_string()),
863        );
864        if let Some(b) = bytes_for_cache {
865            self.cache.insert(body_key, b);
866        } else {
867            self.cache.invalidate(&crate::cache::BodyKey::new(
868                bucket.to_string(),
869                key.to_string(),
870                version.map(|s| s.to_string()),
871            ));
872        }
873
874        Ok(BodyRef::Disk {
875            bucket: bucket.to_string(),
876            key: key.to_string(),
877            version: version.map(|s| s.to_string()),
878            path: bin_path,
879            size,
880        })
881    }
882
883    fn put_object_meta(
884        &self,
885        bucket: &str,
886        key: &str,
887        version: Option<&str>,
888        meta: &ObjectMeta,
889    ) -> StoreResult<()> {
890        let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
891        std::fs::create_dir_all(&dir)?;
892        crate::atomic::write_atomic_toml(&toml_path, meta)?;
893        Ok(())
894    }
895
896    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
897        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
898        for p in [&bin_path, &toml_path] {
899            match std::fs::remove_file(p) {
900                Ok(_) => {}
901                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
902                Err(e) => return Err(e.into()),
903            }
904        }
905        Self::cleanup_empty(&dir);
906
907        self.cache.invalidate(&crate::cache::BodyKey::new(
908            bucket.to_string(),
909            key.to_string(),
910            version.map(|s| s.to_string()),
911        ));
912        Ok(())
913    }
914
915    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
916        match body {
917            BodyRef::Memory(b) => Ok(b.clone()),
918            BodyRef::Disk {
919                bucket,
920                key,
921                version,
922                path,
923                size: _,
924            } => {
925                let body_key =
926                    crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
927                if let Some(bytes) = self.cache.get(&body_key) {
928                    return Ok(bytes);
929                }
930                let bytes = Bytes::from(std::fs::read(path)?);
931                self.cache.insert(body_key, bytes.clone());
932                Ok(bytes)
933            }
934        }
935    }
936
937    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
938        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
939        std::fs::create_dir_all(&parts_dir)?;
940        let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
941        crate::atomic::write_atomic_toml(&init_path, init)?;
942        Ok(())
943    }
944
945    fn mpu_put_part(
946        &self,
947        bucket: &str,
948        upload_id: &str,
949        part_number: u32,
950        body: BodySource,
951        etag: &str,
952    ) -> StoreResult<()> {
953        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
954        std::fs::create_dir_all(&parts_dir)?;
955        let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
956        let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
957
958        let size: u64 = match body {
959            BodySource::Bytes(b) => {
960                let n = b.len() as u64;
961                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
962                let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
963                self.cache.insert(cache_key, b);
964                n
965            }
966            BodySource::File(src) => {
967                let n = std::fs::metadata(&src)?.len();
968                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
969                self.cache
970                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
971                n
972            }
973            BodySource::FileCopy(src) => {
974                let n = std::fs::metadata(&src)?.len();
975                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
976                self.cache
977                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
978                n
979            }
980        };
981
982        let meta = UploadPartMeta {
983            part_number,
984            etag: etag.to_string(),
985            size,
986            last_modified: Utc::now(),
987        };
988        crate::atomic::write_atomic_toml(&toml_path, &meta)?;
989        Ok(())
990    }
991
992    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
993        let dir = self.mpu_dir(bucket, upload_id);
994        // Invalidate any cached part bodies for this upload.
995        if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
996            for entry in entries.flatten() {
997                let path = entry.path();
998                if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
999                    if let Some(stem) = fname.strip_suffix(".bin") {
1000                        if let Ok(n) = stem.parse::<u32>() {
1001                            self.cache
1002                                .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1003                        }
1004                    }
1005                }
1006            }
1007        }
1008        match std::fs::remove_dir_all(&dir) {
1009            Ok(_) => Ok(()),
1010            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1011            Err(e) => Err(e.into()),
1012        }
1013    }
1014
1015    fn mpu_complete(
1016        &self,
1017        bucket: &str,
1018        upload_id: &str,
1019        final_key: &str,
1020        version: Option<&str>,
1021        meta: &ObjectMeta,
1022    ) -> StoreResult<BodyRef> {
1023        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1024        if !parts_dir.exists() {
1025            return Err(io_other(format!(
1026                "mpu_complete: no parts dir for upload {}",
1027                upload_id
1028            )));
1029        }
1030
1031        // Enumerate parts in ascending part-number order.
1032        let mut part_numbers: Vec<u32> = Vec::new();
1033        for entry in std::fs::read_dir(&parts_dir)? {
1034            let entry = entry?;
1035            let path = entry.path();
1036            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1037                continue;
1038            };
1039            if let Some(stem) = fname.strip_suffix(".toml") {
1040                if let Ok(n) = stem.parse::<u32>() {
1041                    if self.mpu_part_bin(bucket, upload_id, n).exists() {
1042                        part_numbers.push(n);
1043                    }
1044                }
1045            }
1046        }
1047        part_numbers.sort_unstable();
1048        if part_numbers.is_empty() {
1049            return Err(io_other(format!(
1050                "mpu_complete: upload {} has no parts",
1051                upload_id
1052            )));
1053        }
1054
1055        let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1056        std::fs::create_dir_all(&dir)?;
1057
1058        let total_size: u64 = if part_numbers.len() == 1 {
1059            let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1060            let sz = std::fs::metadata(&only)?.len();
1061            match std::fs::rename(&only, &bin_path) {
1062                Ok(_) => {}
1063                Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1064                    // Cross-device rename: fall back to streaming copy then remove source.
1065                    {
1066                        let mut input = std::fs::File::open(&only)?;
1067                        let tmp = {
1068                            let mut os = bin_path.as_os_str().to_owned();
1069                            os.push(".tmp");
1070                            PathBuf::from(os)
1071                        };
1072                        let mut out = std::fs::OpenOptions::new()
1073                            .write(true)
1074                            .create(true)
1075                            .truncate(true)
1076                            .open(&tmp)?;
1077                        std::io::copy(&mut input, &mut out)?;
1078                        out.sync_all()?;
1079                        std::fs::rename(&tmp, &bin_path)?;
1080                    }
1081                    let _ = std::fs::remove_file(&only);
1082                }
1083                Err(e) => return Err(e.into()),
1084            }
1085            sz
1086        } else {
1087            let tmp = {
1088                let mut os = bin_path.as_os_str().to_owned();
1089                os.push(".tmp");
1090                PathBuf::from(os)
1091            };
1092            let mut total: u64 = 0;
1093            {
1094                let mut out = std::fs::OpenOptions::new()
1095                    .write(true)
1096                    .create(true)
1097                    .truncate(true)
1098                    .open(&tmp)?;
1099                for n in &part_numbers {
1100                    let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1101                    let mut input = std::fs::File::open(&part_path)?;
1102                    let copied = std::io::copy(&mut input, &mut out)?;
1103                    total += copied;
1104                }
1105                out.sync_all()?;
1106            }
1107            std::fs::rename(&tmp, &bin_path)?;
1108            if let Some(parent) = bin_path.parent() {
1109                if let Ok(dir_handle) = std::fs::File::open(parent) {
1110                    let _ = dir_handle.sync_all();
1111                }
1112            }
1113            total
1114        };
1115
1116        crate::atomic::write_atomic_toml(&toml_path, meta)?;
1117
1118        // Invalidate per-part cache entries and drop the mpu dir.
1119        for n in &part_numbers {
1120            self.cache
1121                .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1122        }
1123        let mpu_dir = self.mpu_dir(bucket, upload_id);
1124        let _ = std::fs::remove_dir_all(&mpu_dir);
1125
1126        // The concatenated body is deliberately NOT re-inserted into the cache —
1127        // large multipart uploads typically exceed the single-object cap, and we
1128        // must never round-trip the assembled body through RAM. The next
1129        // open_object_body call will load through the normal cache path.
1130        self.cache.invalidate(&crate::cache::BodyKey::new(
1131            bucket.to_string(),
1132            final_key.to_string(),
1133            version.map(|s| s.to_string()),
1134        ));
1135
1136        Ok(BodyRef::Disk {
1137            bucket: bucket.to_string(),
1138            key: final_key.to_string(),
1139            version: version.map(|s| s.to_string()),
1140            path: bin_path,
1141            size: total_size,
1142        })
1143    }
1144}
1145
1146fn libc_exdev() -> i32 {
1147    18
1148}
1149
1150#[cfg(test)]
1151mod disk_tests {
1152    use super::*;
1153    use std::sync::Arc;
1154    use tempfile::TempDir;
1155
1156    fn new_store(tmp: &TempDir) -> DiskS3Store {
1157        let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1158        DiskS3Store::new(tmp.path().to_path_buf(), cache)
1159    }
1160
1161    fn new_store_with_cache(
1162        tmp: &TempDir,
1163        cap: u64,
1164    ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1165        let cache = Arc::new(crate::cache::BodyCache::new(cap));
1166        (
1167            DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1168            cache,
1169        )
1170    }
1171
1172    fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1173        ObjectMeta {
1174            key: key.to_string(),
1175            content_type: "application/octet-stream".to_string(),
1176            etag: "etag".to_string(),
1177            size,
1178            ..Default::default()
1179        }
1180    }
1181
1182    #[test]
1183    fn put_bucket_meta_roundtrip() {
1184        let tmp = TempDir::new().unwrap();
1185        let store = new_store(&tmp);
1186        let meta = BucketMeta {
1187            name: "b1".to_string(),
1188            region: "us-east-1".to_string(),
1189            versioning: Some("Enabled".to_string()),
1190            ..Default::default()
1191        };
1192        store.put_bucket_meta("b1", &meta).unwrap();
1193        let loaded = store.load().unwrap();
1194        let snap = loaded.buckets.get("b1").unwrap();
1195        assert_eq!(snap.meta.name, "b1");
1196        assert_eq!(snap.meta.region, "us-east-1");
1197        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1198    }
1199
1200    #[test]
1201    fn put_bucket_subresource_each_variant_writes_file() {
1202        let tmp = TempDir::new().unwrap();
1203        let store = new_store(&tmp);
1204        store
1205            .put_bucket_meta(
1206                "b",
1207                &BucketMeta {
1208                    name: "b".to_string(),
1209                    ..Default::default()
1210                },
1211            )
1212            .unwrap();
1213        let variants = [
1214            BucketSubresource::Tags,
1215            BucketSubresource::Lifecycle,
1216            BucketSubresource::Cors,
1217            BucketSubresource::Policy,
1218            BucketSubresource::Notification,
1219            BucketSubresource::Logging,
1220            BucketSubresource::Website,
1221            BucketSubresource::PublicAccessBlock,
1222            BucketSubresource::ObjectLock,
1223            BucketSubresource::Replication,
1224            BucketSubresource::Ownership,
1225            BucketSubresource::Inventory,
1226            BucketSubresource::Encryption,
1227            BucketSubresource::Versioning,
1228            BucketSubresource::Acl,
1229            BucketSubresource::Accelerate,
1230        ];
1231        for v in variants {
1232            store
1233                .put_bucket_subresource("b", v, "payload=true")
1234                .unwrap();
1235            let file = store
1236                .bucket_dir("b")
1237                .join(DiskS3Store::subresource_filename(v));
1238            assert!(file.exists(), "{:?}", v);
1239            assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1240        }
1241    }
1242
1243    #[test]
1244    fn delete_bucket_subresource_removes_file() {
1245        let tmp = TempDir::new().unwrap();
1246        let store = new_store(&tmp);
1247        store
1248            .put_bucket_meta(
1249                "b",
1250                &BucketMeta {
1251                    name: "b".to_string(),
1252                    ..Default::default()
1253                },
1254            )
1255            .unwrap();
1256        store
1257            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1258            .unwrap();
1259        store
1260            .delete_bucket_subresource("b", BucketSubresource::Tags)
1261            .unwrap();
1262        let file = store.bucket_dir("b").join("tags.toml");
1263        assert!(!file.exists());
1264        // idempotent
1265        store
1266            .delete_bucket_subresource("b", BucketSubresource::Tags)
1267            .unwrap();
1268    }
1269
1270    #[test]
1271    fn put_object_bytes_roundtrip() {
1272        let tmp = TempDir::new().unwrap();
1273        let store = new_store(&tmp);
1274        store
1275            .put_bucket_meta(
1276                "b",
1277                &BucketMeta {
1278                    name: "b".to_string(),
1279                    ..Default::default()
1280                },
1281            )
1282            .unwrap();
1283        let data = Bytes::from_static(b"hello world");
1284        let meta = sample_meta("k1", data.len() as u64);
1285        let body_ref = store
1286            .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1287            .unwrap();
1288        match &body_ref {
1289            BodyRef::Disk {
1290                bucket,
1291                key,
1292                size,
1293                path,
1294                ..
1295            } => {
1296                assert_eq!(bucket, "b");
1297                assert_eq!(key, "k1");
1298                assert_eq!(*size, data.len() as u64);
1299                assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1300            }
1301            _ => panic!("expected Disk"),
1302        }
1303        let loaded = store.load().unwrap();
1304        let snap = loaded.buckets.get("b").unwrap();
1305        let obj = snap.objects.get("k1").unwrap();
1306        assert_eq!(obj.meta.size, data.len() as u64);
1307    }
1308
1309    #[test]
1310    fn put_object_file_copy_source_leaves_src_in_place() {
1311        let tmp = TempDir::new().unwrap();
1312        let store = new_store(&tmp);
1313        store
1314            .put_bucket_meta(
1315                "b",
1316                &BucketMeta {
1317                    name: "b".to_string(),
1318                    ..Default::default()
1319                },
1320            )
1321            .unwrap();
1322        let src_dir = TempDir::new().unwrap();
1323        let src = src_dir.path().join("src.bin");
1324        std::fs::write(&src, b"copied-body").unwrap();
1325        let meta = sample_meta("k", 11);
1326        let bref = store
1327            .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1328            .unwrap();
1329        match bref {
1330            BodyRef::Disk { path, size, .. } => {
1331                assert_eq!(size, 11);
1332                assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1333            }
1334            _ => panic!("expected Disk bodyref"),
1335        }
1336        assert!(src.exists(), "source file must not be moved by FileCopy");
1337        assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1338    }
1339
1340    #[test]
1341    fn put_object_file_source() {
1342        let tmp = TempDir::new().unwrap();
1343        let store = new_store(&tmp);
1344        store
1345            .put_bucket_meta(
1346                "b",
1347                &BucketMeta {
1348                    name: "b".to_string(),
1349                    ..Default::default()
1350                },
1351            )
1352            .unwrap();
1353        let src = tmp.path().join("src.bin");
1354        std::fs::write(&src, b"file-body").unwrap();
1355        let meta = sample_meta("k", 9);
1356        let body_ref = store
1357            .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1358            .unwrap();
1359        let path = match body_ref {
1360            BodyRef::Disk { path, .. } => path,
1361            _ => panic!(),
1362        };
1363        assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1364    }
1365
1366    #[test]
1367    fn put_object_meta_only_keeps_bin() {
1368        let tmp = TempDir::new().unwrap();
1369        let store = new_store(&tmp);
1370        store
1371            .put_bucket_meta(
1372                "b",
1373                &BucketMeta {
1374                    name: "b".to_string(),
1375                    ..Default::default()
1376                },
1377            )
1378            .unwrap();
1379        let data = Bytes::from_static(b"abc");
1380        let mut meta = sample_meta("k", 3);
1381        store
1382            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1383            .unwrap();
1384        let (_, bin, _) = store.object_paths("b", "k", None);
1385        let before = std::fs::read(&bin).unwrap();
1386        meta.tags.insert("x".to_string(), "y".to_string());
1387        store.put_object_meta("b", "k", None, &meta).unwrap();
1388        assert_eq!(std::fs::read(&bin).unwrap(), before);
1389        let loaded = store.load().unwrap();
1390        let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1391        assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1392    }
1393
1394    #[test]
1395    fn delete_object_cleans_up_files_and_cache() {
1396        let tmp = TempDir::new().unwrap();
1397        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1398        store
1399            .put_bucket_meta(
1400                "b",
1401                &BucketMeta {
1402                    name: "b".to_string(),
1403                    ..Default::default()
1404                },
1405            )
1406            .unwrap();
1407        let data = Bytes::from_static(b"bye");
1408        let meta = sample_meta("k", 3);
1409        store
1410            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1411            .unwrap();
1412        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1413        assert!(cache.get(&body_key).is_some());
1414        store.delete_object("b", "k", None).unwrap();
1415        let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1416        assert!(!bin.exists());
1417        assert!(!toml_path.exists());
1418        assert!(!dir.exists());
1419        assert!(cache.get(&body_key).is_none());
1420    }
1421
1422    #[test]
1423    fn open_object_body_cache_hit_and_refill() {
1424        let tmp = TempDir::new().unwrap();
1425        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1426        store
1427            .put_bucket_meta(
1428                "b",
1429                &BucketMeta {
1430                    name: "b".to_string(),
1431                    ..Default::default()
1432                },
1433            )
1434            .unwrap();
1435        let data = Bytes::from_static(b"payload");
1436        let meta = sample_meta("k", data.len() as u64);
1437        let body_ref = store
1438            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1439            .unwrap();
1440        // Cache hit.
1441        let got = store.open_object_body(&body_ref).unwrap();
1442        assert_eq!(got, data);
1443        // Invalidate and re-read populates cache from disk.
1444        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1445        cache.invalidate(&body_key);
1446        assert!(cache.get(&body_key).is_none());
1447        let got = store.open_object_body(&body_ref).unwrap();
1448        assert_eq!(got, data);
1449        assert!(cache.get(&body_key).is_some());
1450    }
1451
1452    #[test]
1453    fn open_object_body_large_bypasses_cache() {
1454        let tmp = TempDir::new().unwrap();
1455        // capacity 1024 → single-object cap 512. Use 800-byte body.
1456        let (store, cache) = new_store_with_cache(&tmp, 1024);
1457        store
1458            .put_bucket_meta(
1459                "b",
1460                &BucketMeta {
1461                    name: "b".to_string(),
1462                    ..Default::default()
1463                },
1464            )
1465            .unwrap();
1466        let data = Bytes::from(vec![7u8; 800]);
1467        let meta = sample_meta("big", 800);
1468        let body_ref = store
1469            .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1470            .unwrap();
1471        let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1472        assert!(cache.get(&body_key).is_none());
1473        let got = store.open_object_body(&body_ref).unwrap();
1474        assert_eq!(got, data);
1475        // Still none — exceeds single-object cap.
1476        assert!(cache.get(&body_key).is_none());
1477    }
1478
1479    #[test]
1480    fn load_empty_dir() {
1481        let tmp = TempDir::new().unwrap();
1482        let store = new_store(&tmp);
1483        let s = store.load().unwrap();
1484        assert!(s.buckets.is_empty());
1485    }
1486
1487    #[test]
1488    fn load_skips_mpu_without_init() {
1489        let tmp = TempDir::new().unwrap();
1490        let store = new_store(&tmp);
1491        store
1492            .put_bucket_meta(
1493                "b",
1494                &BucketMeta {
1495                    name: "b".to_string(),
1496                    ..Default::default()
1497                },
1498            )
1499            .unwrap();
1500        let data = Bytes::from_static(b"x");
1501        let meta = sample_meta("k", 1);
1502        store
1503            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1504            .unwrap();
1505        // Directory with no init.toml is skipped by load.
1506        let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1507        std::fs::create_dir_all(&mpu).unwrap();
1508
1509        let loaded = store.load().unwrap();
1510        let snap = loaded.buckets.get("b").unwrap();
1511        assert_eq!(snap.objects.len(), 1);
1512        assert!(snap.multipart_uploads.is_empty());
1513    }
1514
1515    #[test]
1516    fn load_reads_bucket_subresources() {
1517        let tmp = TempDir::new().unwrap();
1518        let store = new_store(&tmp);
1519        store
1520            .put_bucket_meta(
1521                "b",
1522                &BucketMeta {
1523                    name: "b".to_string(),
1524                    ..Default::default()
1525                },
1526            )
1527            .unwrap();
1528        store
1529            .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1530            .unwrap();
1531        store
1532            .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1533            .unwrap();
1534        store
1535            .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1536            .unwrap();
1537        store
1538            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1539            .unwrap();
1540        let loaded = store.load().unwrap();
1541        let snap = loaded.buckets.get("b").unwrap();
1542        assert_eq!(
1543            snap.subresources.get("lifecycle.toml").map(String::as_str),
1544            Some("<Lifecycle/>"),
1545        );
1546        assert_eq!(
1547            snap.subresources.get("cors.toml").map(String::as_str),
1548            Some("<Cors/>"),
1549        );
1550        assert_eq!(
1551            snap.subresources.get("policy.toml").map(String::as_str),
1552            Some("{}"),
1553        );
1554        assert_eq!(
1555            snap.subresources.get("tags.toml").map(String::as_str),
1556            Some("x=1"),
1557        );
1558    }
1559
1560    #[test]
1561    fn versioned_put_load_roundtrip() {
1562        let tmp = TempDir::new().unwrap();
1563        let store = new_store(&tmp);
1564        store
1565            .put_bucket_meta(
1566                "b",
1567                &BucketMeta {
1568                    name: "b".to_string(),
1569                    versioning: Some("Enabled".to_string()),
1570                    ..Default::default()
1571                },
1572            )
1573            .unwrap();
1574        let base = chrono::Utc::now();
1575        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1576            .iter()
1577            .enumerate()
1578        {
1579            let mut m = sample_meta("k", body.len() as u64);
1580            m.version_id = Some((*vid).to_string());
1581            m.last_modified = base + chrono::Duration::seconds(i as i64);
1582            store
1583                .put_object(
1584                    "b",
1585                    "k",
1586                    Some(*vid),
1587                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1588                    &m,
1589                )
1590                .unwrap();
1591        }
1592        let loaded = store.load().unwrap();
1593        let snap = loaded.buckets.get("b").unwrap();
1594        let versions = snap.object_versions.get("k").expect("versions present");
1595        assert_eq!(versions.len(), 3);
1596        assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1597        assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1598        assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1599        for v in versions {
1600            match &v.body {
1601                BodyRef::Disk { size, .. } => assert!(*size > 0),
1602                _ => panic!("expected Disk body"),
1603            }
1604        }
1605    }
1606
1607    #[test]
1608    fn versioned_load_promotes_latest_live_to_snap_objects() {
1609        let tmp = TempDir::new().unwrap();
1610        let store = new_store(&tmp);
1611        store
1612            .put_bucket_meta(
1613                "b",
1614                &BucketMeta {
1615                    name: "b".to_string(),
1616                    versioning: Some("Enabled".to_string()),
1617                    ..Default::default()
1618                },
1619            )
1620            .unwrap();
1621        let base = chrono::Utc::now();
1622        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1623            .iter()
1624            .enumerate()
1625        {
1626            let mut m = sample_meta("k", body.len() as u64);
1627            m.version_id = Some((*vid).to_string());
1628            m.last_modified = base + chrono::Duration::seconds(i as i64);
1629            store
1630                .put_object(
1631                    "b",
1632                    "k",
1633                    Some(*vid),
1634                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1635                    &m,
1636                )
1637                .unwrap();
1638        }
1639        let loaded = store.load().unwrap();
1640        let snap = loaded.buckets.get("b").unwrap();
1641        let current = snap.objects.get("k").expect("current object promoted");
1642        assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1643    }
1644
1645    #[test]
1646    fn versioned_load_trailing_delete_marker_hides_current() {
1647        let tmp = TempDir::new().unwrap();
1648        let store = new_store(&tmp);
1649        store
1650            .put_bucket_meta(
1651                "b",
1652                &BucketMeta {
1653                    name: "b".to_string(),
1654                    versioning: Some("Enabled".to_string()),
1655                    ..Default::default()
1656                },
1657            )
1658            .unwrap();
1659        let base = chrono::Utc::now();
1660        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1661            .iter()
1662            .enumerate()
1663        {
1664            let mut m = sample_meta("k", body.len() as u64);
1665            m.version_id = Some((*vid).to_string());
1666            m.last_modified = base + chrono::Duration::seconds(i as i64);
1667            store
1668                .put_object(
1669                    "b",
1670                    "k",
1671                    Some(*vid),
1672                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1673                    &m,
1674                )
1675                .unwrap();
1676        }
1677        // Append a delete marker on top.
1678        let mut dm = sample_meta("k", 0);
1679        dm.version_id = Some("dm1".to_string());
1680        dm.is_delete_marker = true;
1681        dm.last_modified = base + chrono::Duration::seconds(10);
1682        store
1683            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1684            .unwrap();
1685        let loaded = store.load().unwrap();
1686        let snap = loaded.buckets.get("b").unwrap();
1687        assert!(
1688            !snap.objects.contains_key("k"),
1689            "trailing delete marker must hide current object",
1690        );
1691        assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1692    }
1693
1694    #[test]
1695    fn legacy_null_object_overridden_by_newer_versions() {
1696        // A pre-versioning null put followed by versioning-enabled puts must
1697        // see snap.objects reflect the newest live version, not the stale null.
1698        let tmp = TempDir::new().unwrap();
1699        let store = new_store(&tmp);
1700        store
1701            .put_bucket_meta(
1702                "b",
1703                &BucketMeta {
1704                    name: "b".to_string(),
1705                    ..Default::default()
1706                },
1707            )
1708            .unwrap();
1709        let base = chrono::Utc::now();
1710        let mut null_meta = sample_meta("k", 3);
1711        null_meta.last_modified = base;
1712        store
1713            .put_object(
1714                "b",
1715                "k",
1716                None,
1717                BodySource::Bytes(Bytes::from_static(b"old")),
1718                &null_meta,
1719            )
1720            .unwrap();
1721        // Enable versioning and put two versions, the second a delete.
1722        store
1723            .put_bucket_meta(
1724                "b",
1725                &BucketMeta {
1726                    name: "b".to_string(),
1727                    versioning: Some("Enabled".to_string()),
1728                    ..Default::default()
1729                },
1730            )
1731            .unwrap();
1732        let mut v1 = sample_meta("k", 3);
1733        v1.version_id = Some("v1".to_string());
1734        v1.last_modified = base + chrono::Duration::seconds(1);
1735        store
1736            .put_object(
1737                "b",
1738                "k",
1739                Some("v1"),
1740                BodySource::Bytes(Bytes::from_static(b"new")),
1741                &v1,
1742            )
1743            .unwrap();
1744        let mut v2 = sample_meta("k", 5);
1745        v2.version_id = Some("v2".to_string());
1746        v2.last_modified = base + chrono::Duration::seconds(2);
1747        store
1748            .put_object(
1749                "b",
1750                "k",
1751                Some("v2"),
1752                BodySource::Bytes(Bytes::from_static(b"newer")),
1753                &v2,
1754            )
1755            .unwrap();
1756        let loaded = store.load().unwrap();
1757        let snap = loaded.buckets.get("b").unwrap();
1758        let current = snap
1759            .objects
1760            .get("k")
1761            .expect("latest live version must override stale null");
1762        assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1763        assert_eq!(current.meta.size, 5);
1764    }
1765
1766    #[test]
1767    fn legacy_null_hidden_by_trailing_delete_marker() {
1768        let tmp = TempDir::new().unwrap();
1769        let store = new_store(&tmp);
1770        store
1771            .put_bucket_meta(
1772                "b",
1773                &BucketMeta {
1774                    name: "b".to_string(),
1775                    ..Default::default()
1776                },
1777            )
1778            .unwrap();
1779        let base = chrono::Utc::now();
1780        let mut null_meta = sample_meta("k", 3);
1781        null_meta.last_modified = base;
1782        store
1783            .put_object(
1784                "b",
1785                "k",
1786                None,
1787                BodySource::Bytes(Bytes::from_static(b"old")),
1788                &null_meta,
1789            )
1790            .unwrap();
1791        store
1792            .put_bucket_meta(
1793                "b",
1794                &BucketMeta {
1795                    name: "b".to_string(),
1796                    versioning: Some("Enabled".to_string()),
1797                    ..Default::default()
1798                },
1799            )
1800            .unwrap();
1801        let mut v1 = sample_meta("k", 3);
1802        v1.version_id = Some("v1".to_string());
1803        v1.last_modified = base + chrono::Duration::seconds(1);
1804        store
1805            .put_object(
1806                "b",
1807                "k",
1808                Some("v1"),
1809                BodySource::Bytes(Bytes::from_static(b"new")),
1810                &v1,
1811            )
1812            .unwrap();
1813        let mut dm = sample_meta("k", 0);
1814        dm.version_id = Some("dm1".to_string());
1815        dm.is_delete_marker = true;
1816        dm.last_modified = base + chrono::Duration::seconds(2);
1817        store
1818            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1819            .unwrap();
1820        let loaded = store.load().unwrap();
1821        let snap = loaded.buckets.get("b").unwrap();
1822        assert!(
1823            !snap.objects.contains_key("k"),
1824            "trailing delete marker must hide even a legacy null object",
1825        );
1826    }
1827
1828    #[test]
1829    fn delete_marker_roundtrip_no_body_file() {
1830        let tmp = TempDir::new().unwrap();
1831        let store = new_store(&tmp);
1832        store
1833            .put_bucket_meta(
1834                "b",
1835                &BucketMeta {
1836                    name: "b".to_string(),
1837                    versioning: Some("Enabled".to_string()),
1838                    ..Default::default()
1839                },
1840            )
1841            .unwrap();
1842        let mut m = sample_meta("k", 0);
1843        m.version_id = Some("dm1".to_string());
1844        m.is_delete_marker = true;
1845        store
1846            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1847            .unwrap();
1848        // No .bin file on disk for delete markers.
1849        let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1850        assert!(!bin.exists(), "delete marker must not have a .bin file");
1851        assert!(toml_path.exists());
1852
1853        let loaded = store.load().unwrap();
1854        let versions = loaded
1855            .buckets
1856            .get("b")
1857            .unwrap()
1858            .object_versions
1859            .get("k")
1860            .unwrap();
1861        assert_eq!(versions.len(), 1);
1862        assert!(versions[0].meta.is_delete_marker);
1863        match &versions[0].body {
1864            BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1865            _ => panic!("delete marker body should be empty Memory"),
1866        }
1867    }
1868
1869    #[test]
1870    fn mixed_nonversioned_and_versioned_buckets() {
1871        let tmp = TempDir::new().unwrap();
1872        let store = new_store(&tmp);
1873        store
1874            .put_bucket_meta(
1875                "a",
1876                &BucketMeta {
1877                    name: "a".to_string(),
1878                    ..Default::default()
1879                },
1880            )
1881            .unwrap();
1882        store
1883            .put_bucket_meta(
1884                "b",
1885                &BucketMeta {
1886                    name: "b".to_string(),
1887                    versioning: Some("Enabled".to_string()),
1888                    ..Default::default()
1889                },
1890            )
1891            .unwrap();
1892        let ma = sample_meta("only", 3);
1893        store
1894            .put_object(
1895                "a",
1896                "only",
1897                None,
1898                BodySource::Bytes(Bytes::from_static(b"aaa")),
1899                &ma,
1900            )
1901            .unwrap();
1902        let base = chrono::Utc::now();
1903        for (i, vid) in ["v1", "v2"].iter().enumerate() {
1904            let mut m = sample_meta("twice", 2);
1905            m.version_id = Some((*vid).to_string());
1906            m.last_modified = base + chrono::Duration::seconds(i as i64);
1907            store
1908                .put_object(
1909                    "b",
1910                    "twice",
1911                    Some(*vid),
1912                    BodySource::Bytes(Bytes::from_static(b"xx")),
1913                    &m,
1914                )
1915                .unwrap();
1916        }
1917        let loaded = store.load().unwrap();
1918        assert_eq!(loaded.buckets.len(), 2);
1919        let a = loaded.buckets.get("a").unwrap();
1920        assert_eq!(a.objects.len(), 1);
1921        assert!(a.object_versions.is_empty());
1922        let b = loaded.buckets.get("b").unwrap();
1923        // Fix #5: the latest live version is promoted into objects so
1924        // unversioned GETs see it post-restart.
1925        assert_eq!(
1926            b.objects.get("twice").unwrap().meta.version_id.as_deref(),
1927            Some("v2")
1928        );
1929        assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
1930    }
1931
1932    fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
1933        MpuInit {
1934            upload_id: upload_id.to_string(),
1935            key: key.to_string(),
1936            initiated: chrono::Utc::now(),
1937            content_type: "application/octet-stream".to_string(),
1938            storage_class: "STANDARD".to_string(),
1939            ..Default::default()
1940        }
1941    }
1942
1943    #[test]
1944    fn mpu_create_then_load_empty_parts() {
1945        let tmp = TempDir::new().unwrap();
1946        let store = new_store(&tmp);
1947        store
1948            .put_bucket_meta(
1949                "b",
1950                &BucketMeta {
1951                    name: "b".to_string(),
1952                    ..Default::default()
1953                },
1954            )
1955            .unwrap();
1956        let init = sample_mpu_init("up1", "k1");
1957        store.mpu_create("b", "up1", &init).unwrap();
1958        let loaded = store.load().unwrap();
1959        let snap = loaded.buckets.get("b").unwrap();
1960        let m = snap.multipart_uploads.get("up1").expect("upload present");
1961        assert_eq!(m.init.upload_id, "up1");
1962        assert_eq!(m.init.key, "k1");
1963        assert!(m.parts.is_empty());
1964    }
1965
1966    #[test]
1967    fn mpu_put_part_then_load_three_parts() {
1968        let tmp = TempDir::new().unwrap();
1969        let store = new_store(&tmp);
1970        store
1971            .put_bucket_meta(
1972                "b",
1973                &BucketMeta {
1974                    name: "b".to_string(),
1975                    ..Default::default()
1976                },
1977            )
1978            .unwrap();
1979        store
1980            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
1981            .unwrap();
1982        let bodies = [
1983            Bytes::from_static(b"part-one"),
1984            Bytes::from_static(b"part-two-longer"),
1985            Bytes::from_static(b"p3"),
1986        ];
1987        for (i, body) in bodies.iter().enumerate() {
1988            let n = (i + 1) as u32;
1989            store
1990                .mpu_put_part(
1991                    "b",
1992                    "up",
1993                    n,
1994                    BodySource::Bytes(body.clone()),
1995                    &format!("et{}", n),
1996                )
1997                .unwrap();
1998        }
1999        let loaded = store.load().unwrap();
2000        let snap = loaded.buckets.get("b").unwrap();
2001        let m = snap.multipart_uploads.get("up").unwrap();
2002        assert_eq!(m.parts.len(), 3);
2003        for (i, body) in bodies.iter().enumerate() {
2004            let n = (i + 1) as u32;
2005            let part = m.parts.get(&n).unwrap();
2006            assert_eq!(part.meta.part_number, n);
2007            assert_eq!(part.meta.size, body.len() as u64);
2008            assert_eq!(part.meta.etag, format!("et{}", n));
2009            match &part.body {
2010                BodyRef::Disk { path, size, .. } => {
2011                    assert_eq!(*size, body.len() as u64);
2012                    assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2013                }
2014                _ => panic!("expected Disk"),
2015            }
2016        }
2017    }
2018
2019    #[test]
2020    fn mpu_abort_removes_upload() {
2021        let tmp = TempDir::new().unwrap();
2022        let store = new_store(&tmp);
2023        store
2024            .put_bucket_meta(
2025                "b",
2026                &BucketMeta {
2027                    name: "b".to_string(),
2028                    ..Default::default()
2029                },
2030            )
2031            .unwrap();
2032        store
2033            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2034            .unwrap();
2035        store
2036            .mpu_put_part(
2037                "b",
2038                "up",
2039                1,
2040                BodySource::Bytes(Bytes::from_static(b"x")),
2041                "e",
2042            )
2043            .unwrap();
2044        store.mpu_abort("b", "up").unwrap();
2045        assert!(!store.mpu_dir("b", "up").exists());
2046        // Idempotent.
2047        store.mpu_abort("b", "up").unwrap();
2048        let loaded = store.load().unwrap();
2049        let snap = loaded.buckets.get("b").unwrap();
2050        assert!(snap.multipart_uploads.is_empty());
2051    }
2052
2053    #[test]
2054    fn mpu_complete_single_part_fast_path() {
2055        let tmp = TempDir::new().unwrap();
2056        let store = new_store(&tmp);
2057        store
2058            .put_bucket_meta(
2059                "b",
2060                &BucketMeta {
2061                    name: "b".to_string(),
2062                    ..Default::default()
2063                },
2064            )
2065            .unwrap();
2066        store
2067            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2068            .unwrap();
2069        let body = Bytes::from_static(b"only-part-bytes");
2070        store
2071            .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2072            .unwrap();
2073        let meta = sample_meta("k", body.len() as u64);
2074        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2075        match &body_ref {
2076            BodyRef::Disk { path, size, .. } => {
2077                assert_eq!(*size, body.len() as u64);
2078                assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2079            }
2080            _ => panic!("expected Disk"),
2081        }
2082        assert!(!store.mpu_dir("b", "up").exists());
2083    }
2084
2085    #[test]
2086    fn mpu_complete_multi_part_concat() {
2087        let tmp = TempDir::new().unwrap();
2088        let store = new_store(&tmp);
2089        store
2090            .put_bucket_meta(
2091                "b",
2092                &BucketMeta {
2093                    name: "b".to_string(),
2094                    ..Default::default()
2095                },
2096            )
2097            .unwrap();
2098        store
2099            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2100            .unwrap();
2101        let p1 = Bytes::from_static(b"AAAA");
2102        let p2 = Bytes::from_static(b"BBBBBB");
2103        let p3 = Bytes::from_static(b"CC");
2104        for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2105            store
2106                .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2107                .unwrap();
2108        }
2109        let mut expected = Vec::new();
2110        expected.extend_from_slice(&p1);
2111        expected.extend_from_slice(&p2);
2112        expected.extend_from_slice(&p3);
2113        let meta = sample_meta("k", expected.len() as u64);
2114        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2115        let path = match body_ref {
2116            BodyRef::Disk { path, size, .. } => {
2117                assert_eq!(size, expected.len() as u64);
2118                path
2119            }
2120            _ => panic!("expected Disk"),
2121        };
2122        assert_eq!(std::fs::read(&path).unwrap(), expected);
2123        assert!(!store.mpu_dir("b", "up").exists());
2124    }
2125
2126    #[test]
2127    fn mpu_complete_large_streaming_via_file_source() {
2128        let tmp = TempDir::new().unwrap();
2129        let store = new_store(&tmp);
2130        store
2131            .put_bucket_meta(
2132                "b",
2133                &BucketMeta {
2134                    name: "b".to_string(),
2135                    ..Default::default()
2136                },
2137            )
2138            .unwrap();
2139        store
2140            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2141            .unwrap();
2142
2143        // 3 parts of ~1 MiB each (kept small so tests stay fast but still
2144        // exercise the BodySource::File + streaming concat path).
2145        const PART_SIZE: usize = 1024 * 1024;
2146        let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2147        let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2148        for (i, byte) in patterns.iter().enumerate() {
2149            let src = tmp.path().join(format!("src-{}.bin", i + 1));
2150            let data = vec![*byte; PART_SIZE];
2151            std::fs::write(&src, &data).unwrap();
2152            expected.extend_from_slice(&data);
2153            store
2154                .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2155                .unwrap();
2156        }
2157        let meta = sample_meta("k", expected.len() as u64);
2158        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2159        let path = match body_ref {
2160            BodyRef::Disk { path, size, .. } => {
2161                assert_eq!(size, expected.len() as u64);
2162                path
2163            }
2164            _ => panic!("expected Disk"),
2165        };
2166        let actual = std::fs::read(&path).unwrap();
2167        assert_eq!(actual.len(), expected.len());
2168        assert_eq!(actual, expected);
2169        assert!(!store.mpu_dir("b", "up").exists());
2170    }
2171
2172    #[test]
2173    fn mpu_resumable_across_load() {
2174        let tmp = TempDir::new().unwrap();
2175        let store = new_store(&tmp);
2176        store
2177            .put_bucket_meta(
2178                "b",
2179                &BucketMeta {
2180                    name: "b".to_string(),
2181                    ..Default::default()
2182                },
2183            )
2184            .unwrap();
2185        store
2186            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2187            .unwrap();
2188        let p1 = Bytes::from_static(b"hello-");
2189        let p2 = Bytes::from_static(b"world-");
2190        store
2191            .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2192            .unwrap();
2193        store
2194            .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2195            .unwrap();
2196
2197        // Simulate a restart: re-open the store on the same dir and load.
2198        let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2199        let loaded = store2.load().unwrap();
2200        let snap = loaded.buckets.get("b").unwrap();
2201        let m = snap.multipart_uploads.get("up").unwrap();
2202        assert_eq!(m.parts.len(), 2);
2203        assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2204        assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2205
2206        // Continue the upload on the fresh store.
2207        let p3 = Bytes::from_static(b"again!");
2208        store2
2209            .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2210            .unwrap();
2211        let mut expected = Vec::new();
2212        expected.extend_from_slice(&p1);
2213        expected.extend_from_slice(&p2);
2214        expected.extend_from_slice(&p3);
2215        let meta = sample_meta("k", expected.len() as u64);
2216        let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2217        let path = match body_ref {
2218            BodyRef::Disk { path, .. } => path,
2219            _ => panic!(),
2220        };
2221        assert_eq!(std::fs::read(&path).unwrap(), expected);
2222        assert!(!store2.mpu_dir("b", "up").exists());
2223    }
2224
2225    #[test]
2226    fn tags_snapshot_roundtrip_via_store() {
2227        let tmp = TempDir::new().unwrap();
2228        let store = new_store(&tmp);
2229        store
2230            .put_bucket_meta(
2231                "b",
2232                &BucketMeta {
2233                    name: "b".to_string(),
2234                    ..Default::default()
2235                },
2236            )
2237            .unwrap();
2238        let mut tags = HashMap::new();
2239        tags.insert("env".to_string(), "prod".to_string());
2240        tags.insert("team".to_string(), "s3".to_string());
2241        let snap = TagsSnapshot { tags: tags.clone() };
2242        let payload = toml::to_string(&snap).unwrap();
2243        store
2244            .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2245            .unwrap();
2246        let loaded = store.load().unwrap();
2247        let text = loaded
2248            .buckets
2249            .get("b")
2250            .unwrap()
2251            .subresources
2252            .get("tags.toml")
2253            .cloned()
2254            .unwrap();
2255        let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2256        assert_eq!(decoded.tags, tags);
2257    }
2258
2259    #[test]
2260    fn acl_snapshot_roundtrip_via_store() {
2261        let tmp = TempDir::new().unwrap();
2262        let store = new_store(&tmp);
2263        store
2264            .put_bucket_meta(
2265                "b",
2266                &BucketMeta {
2267                    name: "b".to_string(),
2268                    ..Default::default()
2269                },
2270            )
2271            .unwrap();
2272        let snap = AclSnapshot {
2273            owner_id: "owner-abc".to_string(),
2274            grants: vec![
2275                AclGrantSnapshot {
2276                    grantee_type: "CanonicalUser".to_string(),
2277                    grantee_id: Some("owner-abc".to_string()),
2278                    grantee_display_name: Some("owner".to_string()),
2279                    grantee_uri: None,
2280                    permission: "FULL_CONTROL".to_string(),
2281                },
2282                AclGrantSnapshot {
2283                    grantee_type: "Group".to_string(),
2284                    grantee_id: None,
2285                    grantee_display_name: None,
2286                    grantee_uri: Some(
2287                        "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2288                    ),
2289                    permission: "READ".to_string(),
2290                },
2291            ],
2292        };
2293        let payload = toml::to_string(&snap).unwrap();
2294        store
2295            .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2296            .unwrap();
2297        let loaded = store.load().unwrap();
2298        let text = loaded
2299            .buckets
2300            .get("b")
2301            .unwrap()
2302            .subresources
2303            .get("acl.toml")
2304            .cloned()
2305            .unwrap();
2306        let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2307        assert_eq!(decoded.owner_id, "owner-abc");
2308        assert_eq!(decoded.grants.len(), 2);
2309        assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2310        assert_eq!(decoded.grants[1].grantee_type, "Group");
2311    }
2312
2313    #[test]
2314    fn inventory_snapshot_roundtrip_via_store() {
2315        let tmp = TempDir::new().unwrap();
2316        let store = new_store(&tmp);
2317        store
2318            .put_bucket_meta(
2319                "b",
2320                &BucketMeta {
2321                    name: "b".to_string(),
2322                    ..Default::default()
2323                },
2324            )
2325            .unwrap();
2326        let mut configs = HashMap::new();
2327        configs.insert(
2328            "inv-1".to_string(),
2329            "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2330        );
2331        configs.insert(
2332            "inv-2".to_string(),
2333            "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2334        );
2335        let snap = InventorySnapshot {
2336            configs: configs.clone(),
2337        };
2338        let payload = toml::to_string(&snap).unwrap();
2339        store
2340            .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2341            .unwrap();
2342        let loaded = store.load().unwrap();
2343        let text = loaded
2344            .buckets
2345            .get("b")
2346            .unwrap()
2347            .subresources
2348            .get("inventory.toml")
2349            .cloned()
2350            .unwrap();
2351        let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2352        assert_eq!(decoded.configs, configs);
2353    }
2354
2355    #[test]
2356    fn legacy_versioning_file_is_read() {
2357        let tmp = TempDir::new().unwrap();
2358        let store = new_store(&tmp);
2359        // Bucket meta with no versioning field set.
2360        store
2361            .put_bucket_meta(
2362                "b",
2363                &BucketMeta {
2364                    name: "b".to_string(),
2365                    ..Default::default()
2366                },
2367            )
2368            .unwrap();
2369        // Legacy sidecar: a bare versioning.toml with "Enabled".
2370        let path = store.bucket_dir("b").join("versioning.toml");
2371        std::fs::write(&path, "Enabled").unwrap();
2372        let loaded = store.load().unwrap();
2373        let snap = loaded.buckets.get("b").unwrap();
2374        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2375    }
2376}