Skip to main content

fakecloud_persistence/
s3.rs

1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11    #[serde(default)]
12    pub account_id: String,
13    #[serde(default)]
14    pub region: String,
15    #[serde(default)]
16    pub buckets: BTreeMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21    pub meta: BucketMeta,
22    #[serde(default)]
23    pub objects: BTreeMap<String, LoadedObject>,
24    #[serde(default)]
25    pub object_versions: BTreeMap<String, Vec<LoadedObject>>,
26    #[serde(default)]
27    pub subresources: BTreeMap<String, String>,
28    #[serde(default)]
29    pub multipart_uploads: BTreeMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34    pub init: MpuInit,
35    #[serde(default)]
36    pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41    pub meta: UploadPartMeta,
42    #[serde(default)]
43    pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48    pub meta: ObjectMeta,
49    #[serde(default)]
50    pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55    #[serde(default)]
56    pub name: String,
57    #[serde(default = "default_time")]
58    pub creation_date: DateTime<Utc>,
59    #[serde(default)]
60    pub region: String,
61    #[serde(default)]
62    pub versioning: Option<String>,
63    #[serde(default)]
64    pub acl: Option<String>,
65    #[serde(default)]
66    pub acl_owner_id: String,
67    #[serde(default)]
68    pub accelerate_status: Option<String>,
69    #[serde(default)]
70    pub eventbridge_enabled: bool,
71    #[serde(default)]
72    pub lifecycle_transition_default_min_size: Option<String>,
73}
74
75fn default_time() -> DateTime<Utc> {
76    DateTime::<Utc>::MIN_UTC
77}
78
79#[derive(Debug, Default, Clone, Serialize, Deserialize)]
80pub struct AclGrantSnapshot {
81    pub grantee_type: String,
82    #[serde(default)]
83    pub grantee_id: Option<String>,
84    #[serde(default)]
85    pub grantee_display_name: Option<String>,
86    #[serde(default)]
87    pub grantee_uri: Option<String>,
88    pub permission: String,
89}
90
91#[derive(Debug, Default, Clone, Serialize, Deserialize)]
92pub struct TagsSnapshot {
93    #[serde(default)]
94    pub tags: BTreeMap<String, String>,
95}
96
97#[derive(Debug, Default, Clone, Serialize, Deserialize)]
98pub struct AclSnapshot {
99    #[serde(default)]
100    pub owner_id: String,
101    #[serde(default)]
102    pub grants: Vec<AclGrantSnapshot>,
103}
104
105#[derive(Debug, Default, Clone, Serialize, Deserialize)]
106pub struct InventorySnapshot {
107    #[serde(default)]
108    pub configs: BTreeMap<String, String>,
109}
110
111#[derive(Debug, Default, Clone, Serialize, Deserialize)]
112pub struct ObjectMeta {
113    #[serde(default)]
114    pub key: String,
115    #[serde(default)]
116    pub content_type: String,
117    #[serde(default)]
118    pub etag: String,
119    #[serde(default)]
120    pub size: u64,
121    #[serde(default = "default_time")]
122    pub last_modified: DateTime<Utc>,
123    #[serde(default)]
124    pub metadata: BTreeMap<String, String>,
125    #[serde(default)]
126    pub tags: BTreeMap<String, String>,
127    #[serde(default)]
128    pub storage_class: String,
129    #[serde(default)]
130    pub acl_grants: Vec<AclGrantSnapshot>,
131    #[serde(default)]
132    pub acl_owner_id: Option<String>,
133    #[serde(default)]
134    pub parts_count: Option<u32>,
135    #[serde(default)]
136    pub part_sizes: Option<Vec<(u32, u64)>>,
137    #[serde(default)]
138    pub sse_algorithm: Option<String>,
139    #[serde(default)]
140    pub sse_kms_key_id: Option<String>,
141    #[serde(default)]
142    pub bucket_key_enabled: Option<bool>,
143    #[serde(default)]
144    pub version_id: Option<String>,
145    #[serde(default)]
146    pub is_delete_marker: bool,
147    #[serde(default)]
148    pub restore_ongoing: Option<bool>,
149    #[serde(default)]
150    pub restore_expiry: Option<String>,
151    #[serde(default)]
152    pub checksum_algorithm: Option<String>,
153    #[serde(default)]
154    pub checksum_value: Option<String>,
155    #[serde(default)]
156    pub lock_mode: Option<String>,
157    #[serde(default)]
158    pub lock_retain_until: Option<DateTime<Utc>>,
159    #[serde(default)]
160    pub lock_legal_hold: Option<String>,
161    #[serde(default)]
162    pub content_encoding: Option<String>,
163    #[serde(default)]
164    pub website_redirect_location: Option<String>,
165}
166
167#[derive(Debug, Default, Clone, Serialize, Deserialize)]
168pub struct MpuInit {
169    pub upload_id: String,
170    pub key: String,
171    #[serde(default = "default_time")]
172    pub initiated: DateTime<Utc>,
173    #[serde(default)]
174    pub metadata: BTreeMap<String, String>,
175    #[serde(default)]
176    pub content_type: String,
177    #[serde(default)]
178    pub storage_class: String,
179    #[serde(default)]
180    pub sse_algorithm: Option<String>,
181    #[serde(default)]
182    pub sse_kms_key_id: Option<String>,
183    #[serde(default)]
184    pub tagging: Option<String>,
185    #[serde(default)]
186    pub acl_grants: Vec<AclGrantSnapshot>,
187    #[serde(default)]
188    pub checksum_algorithm: Option<String>,
189}
190
191#[derive(Debug, Default, Clone, Serialize, Deserialize)]
192pub struct UploadPartMeta {
193    pub part_number: u32,
194    pub etag: String,
195    pub size: u64,
196    #[serde(default = "default_time")]
197    pub last_modified: DateTime<Utc>,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
201pub enum BucketSubresource {
202    Tags,
203    Lifecycle,
204    Cors,
205    Policy,
206    Notification,
207    Logging,
208    Website,
209    PublicAccessBlock,
210    ObjectLock,
211    Replication,
212    Ownership,
213    Inventory,
214    Encryption,
215    Versioning,
216    Acl,
217    Accelerate,
218    Analytics,
219    IntelligentTiering,
220    Metrics,
221    RequestPayment,
222    Abac,
223    MetadataConfiguration,
224    MetadataTableConfiguration,
225}
226
227pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
228    BucketSubresource::Tags,
229    BucketSubresource::Lifecycle,
230    BucketSubresource::Cors,
231    BucketSubresource::Policy,
232    BucketSubresource::Notification,
233    BucketSubresource::Logging,
234    BucketSubresource::Website,
235    BucketSubresource::PublicAccessBlock,
236    BucketSubresource::ObjectLock,
237    BucketSubresource::Replication,
238    BucketSubresource::Ownership,
239    BucketSubresource::Inventory,
240    BucketSubresource::Encryption,
241    BucketSubresource::Versioning,
242    BucketSubresource::Acl,
243    BucketSubresource::Accelerate,
244    BucketSubresource::Analytics,
245    BucketSubresource::IntelligentTiering,
246    BucketSubresource::Metrics,
247    BucketSubresource::RequestPayment,
248    BucketSubresource::Abac,
249    BucketSubresource::MetadataConfiguration,
250    BucketSubresource::MetadataTableConfiguration,
251];
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub enum BodyRef {
255    #[serde(skip)]
256    Memory(Bytes),
257    Disk {
258        bucket: String,
259        key: String,
260        #[serde(default)]
261        version: Option<String>,
262        path: PathBuf,
263        size: u64,
264    },
265}
266
267impl BodyRef {
268    pub fn size(&self) -> u64 {
269        match self {
270            BodyRef::Memory(b) => b.len() as u64,
271            BodyRef::Disk { size, .. } => *size,
272        }
273    }
274}
275
276impl Default for BodyRef {
277    fn default() -> Self {
278        BodyRef::Memory(Bytes::new())
279    }
280}
281
282#[derive(Debug)]
283pub enum BodySource {
284    Bytes(Bytes),
285    /// Existing disk path that should be *moved* into the destination via
286    /// rename (upload-tmp → final) and consumed.
287    File(PathBuf),
288    /// Existing disk path that should be *copied* into the destination and
289    /// left in place. Used by replication so the source object stays
290    /// available while the replica is produced.
291    FileCopy(PathBuf),
292}
293
294/// Materialize a [`BodySource`] into [`Bytes`]. The `File` variant is
295/// consumed (file is unlinked after read); `FileCopy` is left in place.
296/// Memory-mode stores call this to absorb tempfiles produced by the
297/// streaming dispatch path so streaming and non-streaming services share
298/// the same wire-shape regardless of `--storage-mode`.
299fn read_body_source(body: BodySource) -> StoreResult<Bytes> {
300    match body {
301        BodySource::Bytes(b) => Ok(b),
302        BodySource::File(p) => {
303            let bytes = std::fs::read(&p)?;
304            // Best-effort: a leftover tempfile is not a correctness
305            // problem so the read result wins over the unlink result.
306            let _ = std::fs::remove_file(&p);
307            Ok(Bytes::from(bytes))
308        }
309        BodySource::FileCopy(p) => {
310            let bytes = std::fs::read(&p)?;
311            Ok(Bytes::from(bytes))
312        }
313    }
314}
315
316#[derive(Debug, Error)]
317pub enum StoreError {
318    #[error("io error: {0}")]
319    Io(#[from] std::io::Error),
320    #[error("serialization error: {0}")]
321    Serde(String),
322    #[error("not supported by this store")]
323    NotSupported,
324    #[error("{0}")]
325    Other(String),
326}
327
328pub type StoreResult<T> = Result<T, StoreError>;
329
330pub trait S3Store: Send + Sync {
331    fn load(&self) -> StoreResult<S3State>;
332
333    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
334    fn put_bucket_subresource(
335        &self,
336        bucket: &str,
337        kind: BucketSubresource,
338        payload: &str,
339    ) -> StoreResult<()>;
340    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
341    fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
342
343    fn put_object(
344        &self,
345        bucket: &str,
346        key: &str,
347        version: Option<&str>,
348        body: BodySource,
349        meta: &ObjectMeta,
350    ) -> StoreResult<BodyRef>;
351    fn put_object_meta(
352        &self,
353        bucket: &str,
354        key: &str,
355        version: Option<&str>,
356        meta: &ObjectMeta,
357    ) -> StoreResult<()>;
358    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
359    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
360
361    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
362    /// Persist a multipart-upload part body. Returns the [`BodyRef`] the
363    /// service should keep in its in-memory part map: `BodyRef::Disk`
364    /// for disk-backed stores (so subsequent reads stream from the
365    /// `.bin` file instead of holding the part in RAM), or
366    /// `BodyRef::Memory` for memory-mode stores.
367    fn mpu_put_part(
368        &self,
369        bucket: &str,
370        upload_id: &str,
371        part_number: u32,
372        body: BodySource,
373        etag: &str,
374    ) -> StoreResult<BodyRef>;
375    /// Where the streaming dispatch path should spool large request
376    /// bodies to disk before handing them to [`Self::put_object`] /
377    /// [`Self::mpu_put_part`]. Returning `Some` keeps the spool file on
378    /// the same filesystem as the final destination so `rename(2)` is a
379    /// metadata-only move; returning `None` means "use the system temp
380    /// dir, the store will read the file back into RAM and unlink it".
381    fn spool_dir(&self) -> Option<std::path::PathBuf> {
382        None
383    }
384    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
385    fn mpu_complete(
386        &self,
387        bucket: &str,
388        upload_id: &str,
389        final_key: &str,
390        version: Option<&str>,
391        meta: &ObjectMeta,
392    ) -> StoreResult<BodyRef>;
393}
394
395pub struct MemoryS3Store;
396
397impl MemoryS3Store {
398    pub fn new() -> Self {
399        Self
400    }
401}
402
403impl Default for MemoryS3Store {
404    fn default() -> Self {
405        Self::new()
406    }
407}
408
409impl S3Store for MemoryS3Store {
410    fn load(&self) -> StoreResult<S3State> {
411        Ok(S3State::default())
412    }
413
414    fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
415        Ok(())
416    }
417    fn put_bucket_subresource(
418        &self,
419        _bucket: &str,
420        _kind: BucketSubresource,
421        _payload: &str,
422    ) -> StoreResult<()> {
423        Ok(())
424    }
425    fn delete_bucket_subresource(
426        &self,
427        _bucket: &str,
428        _kind: BucketSubresource,
429    ) -> StoreResult<()> {
430        Ok(())
431    }
432    fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
433        Ok(())
434    }
435
436    fn put_object(
437        &self,
438        _bucket: &str,
439        _key: &str,
440        _version: Option<&str>,
441        body: BodySource,
442        _meta: &ObjectMeta,
443    ) -> StoreResult<BodyRef> {
444        Ok(BodyRef::Memory(read_body_source(body)?))
445    }
446    fn put_object_meta(
447        &self,
448        _bucket: &str,
449        _key: &str,
450        _version: Option<&str>,
451        _meta: &ObjectMeta,
452    ) -> StoreResult<()> {
453        Ok(())
454    }
455    fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
456        Ok(())
457    }
458    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
459        match body {
460            BodyRef::Memory(b) => Ok(b.clone()),
461            BodyRef::Disk { .. } => {
462                panic!("MemoryS3Store cannot open Disk-backed BodyRef")
463            }
464        }
465    }
466
467    fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
468        Ok(())
469    }
470    fn mpu_put_part(
471        &self,
472        _bucket: &str,
473        _upload_id: &str,
474        _part_number: u32,
475        body: BodySource,
476        _etag: &str,
477    ) -> StoreResult<BodyRef> {
478        // Memory mode keeps all part bodies in RAM. Absorb any tempfile
479        // produced by the streaming dispatch path here.
480        Ok(BodyRef::Memory(read_body_source(body)?))
481    }
482    fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
483        Ok(())
484    }
485    fn mpu_complete(
486        &self,
487        _bucket: &str,
488        _upload_id: &str,
489        _final_key: &str,
490        _version: Option<&str>,
491        _meta: &ObjectMeta,
492    ) -> StoreResult<BodyRef> {
493        Ok(BodyRef::Memory(Bytes::new()))
494    }
495}
496
497pub struct DiskS3Store {
498    root: PathBuf,
499    cache: std::sync::Arc<crate::cache::BodyCache>,
500}
501
502impl DiskS3Store {
503    pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
504        Self { root, cache }
505    }
506
507    fn buckets_dir(&self) -> PathBuf {
508        self.root.join("buckets")
509    }
510
511    fn bucket_dir(&self, bucket: &str) -> PathBuf {
512        self.buckets_dir()
513            .join(crate::key_escape::escape_key_segment(bucket))
514    }
515
516    fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
517        self.bucket_dir(bucket)
518            .join("objects")
519            .join(crate::key_escape::escape_key_segment(key))
520    }
521
522    fn version_tag(version: Option<&str>) -> String {
523        version.unwrap_or("null").to_string()
524    }
525
526    fn object_paths(
527        &self,
528        bucket: &str,
529        key: &str,
530        version: Option<&str>,
531    ) -> (PathBuf, PathBuf, PathBuf) {
532        let dir = self.object_dir(bucket, key);
533        let tag = Self::version_tag(version);
534        let bin = dir.join(format!("{}.bin", tag));
535        let toml = dir.join(format!("{}.toml", tag));
536        (dir, bin, toml)
537    }
538
539    fn subresource_filename(kind: BucketSubresource) -> &'static str {
540        match kind {
541            BucketSubresource::Tags => "tags.toml",
542            BucketSubresource::Lifecycle => "lifecycle.toml",
543            BucketSubresource::Cors => "cors.toml",
544            BucketSubresource::Policy => "policy.toml",
545            BucketSubresource::Notification => "notification.toml",
546            BucketSubresource::Logging => "logging.toml",
547            BucketSubresource::Website => "website.toml",
548            BucketSubresource::PublicAccessBlock => "public_access_block.toml",
549            BucketSubresource::ObjectLock => "object_lock.toml",
550            BucketSubresource::Replication => "replication.toml",
551            BucketSubresource::Ownership => "ownership.toml",
552            BucketSubresource::Inventory => "inventory.toml",
553            BucketSubresource::Encryption => "encryption.toml",
554            BucketSubresource::Versioning => "versioning.toml",
555            BucketSubresource::Acl => "acl.toml",
556            BucketSubresource::Accelerate => "accelerate.toml",
557            BucketSubresource::Analytics => "analytics.toml",
558            BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
559            BucketSubresource::Metrics => "metrics.toml",
560            BucketSubresource::RequestPayment => "request_payment.toml",
561            BucketSubresource::Abac => "abac.toml",
562            BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
563            BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
564        }
565    }
566
567    fn cleanup_empty(dir: &std::path::Path) {
568        let _ = std::fs::remove_dir(dir);
569    }
570
571    fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
572        self.bucket_dir(bucket)
573            .join("mpu")
574            .join(crate::key_escape::escape_key_segment(upload_id))
575    }
576
577    fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
578        self.mpu_dir(bucket, upload_id).join("parts")
579    }
580
581    fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
582        self.mpu_parts_dir(bucket, upload_id)
583            .join(format!("{}.bin", part_number))
584    }
585
586    fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
587        self.mpu_parts_dir(bucket, upload_id)
588            .join(format!("{}.toml", part_number))
589    }
590
591    fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
592        crate::cache::BodyKey::new(
593            bucket.to_string(),
594            format!("__mpu__/{}", upload_id),
595            Some(format!("part-{}", part_number)),
596        )
597    }
598}
599
600fn io_other(msg: impl Into<String>) -> StoreError {
601    StoreError::Other(msg.into())
602}
603
604impl S3Store for DiskS3Store {
605    fn load(&self) -> StoreResult<S3State> {
606        let mut state = S3State::default();
607        let buckets_dir = self.buckets_dir();
608        if !buckets_dir.exists() {
609            return Ok(state);
610        }
611        for entry in std::fs::read_dir(&buckets_dir)? {
612            let entry = entry?;
613            if !entry.file_type()?.is_dir() {
614                continue;
615            }
616            let bdir = entry.path();
617            // Isolate per-bucket load so one corrupt/partial bucket is skipped
618            // with a warning instead of aborting the whole store load -- a single
619            // truncated file used to make every bucket inaccessible (bug-audit
620            // 2026-06-20, 4.3).
621            let loaded: StoreResult<Option<BucketSnapshot>> = (|| {
622                let meta_path = bdir.join("meta.toml");
623                if !meta_path.exists() {
624                    return Ok(None);
625                }
626                let meta_text = std::fs::read_to_string(&meta_path)?;
627                let mut meta: BucketMeta =
628                    toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
629                let mut snap = BucketSnapshot {
630                    meta: meta.clone(),
631                    objects: BTreeMap::new(),
632                    object_versions: BTreeMap::new(),
633                    subresources: BTreeMap::new(),
634                    multipart_uploads: BTreeMap::new(),
635                };
636
637                for kind in ALL_SUBRESOURCES {
638                    let fname = Self::subresource_filename(*kind);
639                    let path = bdir.join(fname);
640                    if path.exists() {
641                        let text = std::fs::read_to_string(&path)?;
642                        if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none()
643                        {
644                            let stripped = text.trim();
645                            if !stripped.is_empty() {
646                                snap.meta.versioning = Some(stripped.to_string());
647                                meta.versioning = snap.meta.versioning.clone();
648                            }
649                        }
650                        snap.subresources.insert(fname.to_string(), text);
651                    }
652                }
653
654                let objects_root = bdir.join("objects");
655                if objects_root.exists() {
656                    for okey_entry in std::fs::read_dir(&objects_root)? {
657                        let okey_entry = okey_entry?;
658                        if !okey_entry.file_type()?.is_dir() {
659                            continue;
660                        }
661                        let key_dir = okey_entry.path();
662                        let mut versioned: Vec<LoadedObject> = Vec::new();
663                        let mut key_name: Option<String> = None;
664                        for version_entry in std::fs::read_dir(&key_dir)? {
665                            let version_entry = version_entry?;
666                            let path = version_entry.path();
667                            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
668                                continue;
669                            };
670                            if !fname.ends_with(".toml") {
671                                continue;
672                            }
673                            let version_tag = &fname[..fname.len() - 5];
674                            let toml_text = std::fs::read_to_string(&path)?;
675                            let obj_meta: ObjectMeta = toml::from_str(&toml_text)
676                                .map_err(|e| StoreError::Serde(e.to_string()))?;
677                            let bin_path = key_dir.join(format!("{}.bin", version_tag));
678                            let (body, size) = if obj_meta.is_delete_marker {
679                                (BodyRef::Memory(Bytes::new()), 0u64)
680                            } else if bin_path.exists() {
681                                let sz = std::fs::metadata(&bin_path)?.len();
682                                (
683                                    BodyRef::Disk {
684                                        bucket: meta.name.clone(),
685                                        key: obj_meta.key.clone(),
686                                        version: if version_tag == "null" {
687                                            None
688                                        } else {
689                                            Some(version_tag.to_string())
690                                        },
691                                        path: bin_path,
692                                        size: sz,
693                                    },
694                                    sz,
695                                )
696                            } else {
697                                // Fail loud: the sidecar says this object has a
698                                // body but the .bin file is missing. Returning
699                                // silently would hand the caller a truncated
700                                // object and hide data loss.
701                                return Err(StoreError::Other(format!(
702                                    "missing body file: {}",
703                                    bin_path.display()
704                                )));
705                            };
706                            let _ = size;
707                            key_name.get_or_insert_with(|| obj_meta.key.clone());
708                            if version_tag == "null" && obj_meta.version_id.is_none() {
709                                snap.objects.insert(
710                                    obj_meta.key.clone(),
711                                    LoadedObject {
712                                        meta: obj_meta,
713                                        body,
714                                    },
715                                );
716                            } else {
717                                versioned.push(LoadedObject {
718                                    meta: obj_meta,
719                                    body,
720                                });
721                            }
722                        }
723                        if !versioned.is_empty() {
724                            versioned.sort_by_key(|v| v.meta.last_modified);
725                            if let Some(key) = key_name {
726                                // Reconcile snap.objects with the newest version:
727                                // a trailing delete marker hides any prior null or
728                                // live version (remove it); otherwise overwrite
729                                // with the newest live version, even if a
730                                // pre-versioning null.toml had already been
731                                // inserted during the non-versioned scan.
732                                match versioned.last() {
733                                    Some(newest) if newest.meta.is_delete_marker => {
734                                        snap.objects.remove(&key);
735                                    }
736                                    Some(newest) => {
737                                        snap.objects.insert(key.clone(), newest.clone());
738                                    }
739                                    None => {}
740                                }
741                                snap.object_versions.insert(key, versioned);
742                            }
743                        }
744                    }
745                }
746
747                let mpu_root = bdir.join("mpu");
748                if mpu_root.exists() {
749                    for upload_entry in std::fs::read_dir(&mpu_root)? {
750                        let upload_entry = upload_entry?;
751                        if !upload_entry.file_type()?.is_dir() {
752                            continue;
753                        }
754                        let upload_dir = upload_entry.path();
755                        let init_path = upload_dir.join("init.toml");
756                        if !init_path.exists() {
757                            continue;
758                        }
759                        let init_text = std::fs::read_to_string(&init_path)?;
760                        let init: MpuInit = toml::from_str(&init_text)
761                            .map_err(|e| StoreError::Serde(e.to_string()))?;
762                        let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
763                        let parts_dir = upload_dir.join("parts");
764                        if parts_dir.exists() {
765                            for part_entry in std::fs::read_dir(&parts_dir)? {
766                                let part_entry = part_entry?;
767                                let path = part_entry.path();
768                                let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
769                                    continue;
770                                };
771                                if !fname.ends_with(".toml") {
772                                    continue;
773                                }
774                                let stem = &fname[..fname.len() - 5];
775                                let Ok(part_number) = stem.parse::<u32>() else {
776                                    continue;
777                                };
778                                let toml_text = std::fs::read_to_string(&path)?;
779                                let part_meta: UploadPartMeta = toml::from_str(&toml_text)
780                                    .map_err(|e| StoreError::Serde(e.to_string()))?;
781                                let bin_path = parts_dir.join(format!("{}.bin", part_number));
782                                if !bin_path.exists() {
783                                    return Err(StoreError::Other(format!(
784                                        "missing multipart part body file: {}",
785                                        bin_path.display()
786                                    )));
787                                }
788                                let sz = std::fs::metadata(&bin_path)?.len();
789                                let body = BodyRef::Disk {
790                                    bucket: meta.name.clone(),
791                                    key: format!("__mpu__/{}", init.upload_id),
792                                    version: Some(format!("part-{}", part_number)),
793                                    path: bin_path,
794                                    size: sz,
795                                };
796                                loaded_parts.insert(
797                                    part_number,
798                                    LoadedPart {
799                                        meta: part_meta,
800                                        body,
801                                    },
802                                );
803                            }
804                        }
805                        snap.multipart_uploads.insert(
806                            init.upload_id.clone(),
807                            LoadedMpu {
808                                init,
809                                parts: loaded_parts,
810                            },
811                        );
812                    }
813                }
814
815                Ok(Some(snap))
816            })();
817            match loaded {
818                Ok(Some(snap)) => {
819                    state.buckets.insert(snap.meta.name.clone(), snap);
820                }
821                Ok(None) => {}
822                Err(e) => tracing::warn!(
823                    bucket = %bdir.display(),
824                    error = %e,
825                    "skipping unreadable S3 bucket during load"
826                ),
827            }
828        }
829        Ok(state)
830    }
831
832    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
833        let dir = self.bucket_dir(bucket);
834        std::fs::create_dir_all(&dir)?;
835        crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
836        Ok(())
837    }
838
839    fn put_bucket_subresource(
840        &self,
841        bucket: &str,
842        kind: BucketSubresource,
843        payload: &str,
844    ) -> StoreResult<()> {
845        let dir = self.bucket_dir(bucket);
846        std::fs::create_dir_all(&dir)?;
847        let path = dir.join(Self::subresource_filename(kind));
848        crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
849        Ok(())
850    }
851
852    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
853        let path = self
854            .bucket_dir(bucket)
855            .join(Self::subresource_filename(kind));
856        match std::fs::remove_file(&path) {
857            Ok(_) => Ok(()),
858            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
859            Err(e) => Err(e.into()),
860        }
861    }
862
863    fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
864        let dir = self.bucket_dir(bucket);
865        match std::fs::remove_dir_all(&dir) {
866            Ok(_) => Ok(()),
867            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
868            Err(e) => Err(e.into()),
869        }
870    }
871
872    fn put_object(
873        &self,
874        bucket: &str,
875        key: &str,
876        version: Option<&str>,
877        body: BodySource,
878        meta: &ObjectMeta,
879    ) -> StoreResult<BodyRef> {
880        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
881        std::fs::create_dir_all(&dir)?;
882
883        if meta.is_delete_marker {
884            crate::atomic::write_atomic_toml(&toml_path, meta)?;
885            return Ok(BodyRef::Memory(Bytes::new()));
886        }
887
888        let size: u64;
889        let bytes_for_cache: Option<Bytes>;
890        match body {
891            BodySource::Bytes(b) => {
892                size = b.len() as u64;
893                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
894                bytes_for_cache = Some(b);
895            }
896            BodySource::File(src) => {
897                let src_size = std::fs::metadata(&src)?.len();
898                size = src_size;
899                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
900                bytes_for_cache = None;
901            }
902            BodySource::FileCopy(src) => {
903                let src_size = std::fs::metadata(&src)?.len();
904                size = src_size;
905                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
906                bytes_for_cache = None;
907            }
908        }
909
910        crate::atomic::write_atomic_toml(&toml_path, meta)?;
911
912        let body_key = crate::cache::BodyKey::new(
913            bucket.to_string(),
914            key.to_string(),
915            version.map(|s| s.to_string()),
916        );
917        if let Some(b) = bytes_for_cache {
918            self.cache.insert(body_key, b);
919        } else {
920            self.cache.invalidate(&crate::cache::BodyKey::new(
921                bucket.to_string(),
922                key.to_string(),
923                version.map(|s| s.to_string()),
924            ));
925        }
926
927        Ok(BodyRef::Disk {
928            bucket: bucket.to_string(),
929            key: key.to_string(),
930            version: version.map(|s| s.to_string()),
931            path: bin_path,
932            size,
933        })
934    }
935
936    fn put_object_meta(
937        &self,
938        bucket: &str,
939        key: &str,
940        version: Option<&str>,
941        meta: &ObjectMeta,
942    ) -> StoreResult<()> {
943        let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
944        std::fs::create_dir_all(&dir)?;
945        crate::atomic::write_atomic_toml(&toml_path, meta)?;
946        Ok(())
947    }
948
949    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
950        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
951        for p in [&bin_path, &toml_path] {
952            match std::fs::remove_file(p) {
953                Ok(_) => {}
954                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
955                Err(e) => return Err(e.into()),
956            }
957        }
958        Self::cleanup_empty(&dir);
959
960        self.cache.invalidate(&crate::cache::BodyKey::new(
961            bucket.to_string(),
962            key.to_string(),
963            version.map(|s| s.to_string()),
964        ));
965        Ok(())
966    }
967
968    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
969        match body {
970            BodyRef::Memory(b) => Ok(b.clone()),
971            BodyRef::Disk {
972                bucket,
973                key,
974                version,
975                path,
976                size: _,
977            } => {
978                let body_key =
979                    crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
980                if let Some(bytes) = self.cache.get(&body_key) {
981                    return Ok(bytes);
982                }
983                let bytes = Bytes::from(std::fs::read(path)?);
984                self.cache.insert(body_key, bytes.clone());
985                Ok(bytes)
986            }
987        }
988    }
989
990    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
991        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
992        std::fs::create_dir_all(&parts_dir)?;
993        let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
994        crate::atomic::write_atomic_toml(&init_path, init)?;
995        Ok(())
996    }
997
998    fn mpu_put_part(
999        &self,
1000        bucket: &str,
1001        upload_id: &str,
1002        part_number: u32,
1003        body: BodySource,
1004        etag: &str,
1005    ) -> StoreResult<BodyRef> {
1006        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1007        std::fs::create_dir_all(&parts_dir)?;
1008        let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
1009        let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
1010
1011        let size: u64 = match body {
1012            BodySource::Bytes(b) => {
1013                let n = b.len() as u64;
1014                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
1015                let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
1016                self.cache.insert(cache_key, b);
1017                n
1018            }
1019            BodySource::File(src) => {
1020                let n = std::fs::metadata(&src)?.len();
1021                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
1022                self.cache
1023                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1024                n
1025            }
1026            BodySource::FileCopy(src) => {
1027                let n = std::fs::metadata(&src)?.len();
1028                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
1029                self.cache
1030                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1031                n
1032            }
1033        };
1034
1035        let meta = UploadPartMeta {
1036            part_number,
1037            etag: etag.to_string(),
1038            size,
1039            last_modified: Utc::now(),
1040        };
1041        crate::atomic::write_atomic_toml(&toml_path, &meta)?;
1042        // Disk-mode parts live on disk; expose them as BodyRef::Disk so
1043        // the runtime state never holds the part body in RAM.
1044        Ok(BodyRef::Disk {
1045            bucket: bucket.to_string(),
1046            key: format!("__mpu__/{upload_id}/part-{part_number:05}"),
1047            version: None,
1048            path: bin_path,
1049            size,
1050        })
1051    }
1052
1053    fn spool_dir(&self) -> Option<std::path::PathBuf> {
1054        let dir = self.root.join(".spool");
1055        // Best-effort: create lazily; the spool helper also creates if
1056        // missing. Returning the path even when create fails is safe —
1057        // the helper handles both create and the actual file open.
1058        let _ = std::fs::create_dir_all(&dir);
1059        Some(dir)
1060    }
1061
1062    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
1063        let dir = self.mpu_dir(bucket, upload_id);
1064        // Invalidate any cached part bodies for this upload.
1065        if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
1066            for entry in entries.flatten() {
1067                let path = entry.path();
1068                if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
1069                    if let Some(stem) = fname.strip_suffix(".bin") {
1070                        if let Ok(n) = stem.parse::<u32>() {
1071                            self.cache
1072                                .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1073                        }
1074                    }
1075                }
1076            }
1077        }
1078        match std::fs::remove_dir_all(&dir) {
1079            Ok(_) => Ok(()),
1080            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1081            Err(e) => Err(e.into()),
1082        }
1083    }
1084
1085    fn mpu_complete(
1086        &self,
1087        bucket: &str,
1088        upload_id: &str,
1089        final_key: &str,
1090        version: Option<&str>,
1091        meta: &ObjectMeta,
1092    ) -> StoreResult<BodyRef> {
1093        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1094        if !parts_dir.exists() {
1095            return Err(io_other(format!(
1096                "mpu_complete: no parts dir for upload {}",
1097                upload_id
1098            )));
1099        }
1100
1101        // Enumerate parts in ascending part-number order.
1102        let mut part_numbers: Vec<u32> = Vec::new();
1103        for entry in std::fs::read_dir(&parts_dir)? {
1104            let entry = entry?;
1105            let path = entry.path();
1106            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1107                continue;
1108            };
1109            if let Some(stem) = fname.strip_suffix(".toml") {
1110                if let Ok(n) = stem.parse::<u32>() {
1111                    if self.mpu_part_bin(bucket, upload_id, n).exists() {
1112                        part_numbers.push(n);
1113                    }
1114                }
1115            }
1116        }
1117        part_numbers.sort_unstable();
1118        if part_numbers.is_empty() {
1119            return Err(io_other(format!(
1120                "mpu_complete: upload {} has no parts",
1121                upload_id
1122            )));
1123        }
1124
1125        let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1126        std::fs::create_dir_all(&dir)?;
1127
1128        let total_size: u64 = if part_numbers.len() == 1 {
1129            let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1130            let sz = std::fs::metadata(&only)?.len();
1131            match std::fs::rename(&only, &bin_path) {
1132                Ok(_) => {}
1133                Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1134                    // Cross-device rename: fall back to streaming copy then remove source.
1135                    {
1136                        let mut input = std::fs::File::open(&only)?;
1137                        let tmp = {
1138                            let mut os = bin_path.as_os_str().to_owned();
1139                            os.push(".tmp");
1140                            PathBuf::from(os)
1141                        };
1142                        let mut out = std::fs::OpenOptions::new()
1143                            .write(true)
1144                            .create(true)
1145                            .truncate(true)
1146                            .open(&tmp)?;
1147                        std::io::copy(&mut input, &mut out)?;
1148                        out.sync_all()?;
1149                        std::fs::rename(&tmp, &bin_path)?;
1150                    }
1151                    let _ = std::fs::remove_file(&only);
1152                }
1153                Err(e) => return Err(e.into()),
1154            }
1155            // fsync the parent directory so the rename that links the completed
1156            // object is durable, matching the multi-part branch below. Without
1157            // it a power-loss could leave a 200-OK'd single-part MPU not durably
1158            // linked, poisoning the bucket load (bug-audit 2026-06-20, 4.4).
1159            if let Some(parent) = bin_path.parent() {
1160                if let Ok(dir_handle) = std::fs::File::open(parent) {
1161                    let _ = dir_handle.sync_all();
1162                }
1163            }
1164            sz
1165        } else {
1166            let tmp = {
1167                let mut os = bin_path.as_os_str().to_owned();
1168                os.push(".tmp");
1169                PathBuf::from(os)
1170            };
1171            let mut total: u64 = 0;
1172            {
1173                let mut out = std::fs::OpenOptions::new()
1174                    .write(true)
1175                    .create(true)
1176                    .truncate(true)
1177                    .open(&tmp)?;
1178                for n in &part_numbers {
1179                    let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1180                    let mut input = std::fs::File::open(&part_path)?;
1181                    let copied = std::io::copy(&mut input, &mut out)?;
1182                    total += copied;
1183                }
1184                out.sync_all()?;
1185            }
1186            std::fs::rename(&tmp, &bin_path)?;
1187            if let Some(parent) = bin_path.parent() {
1188                if let Ok(dir_handle) = std::fs::File::open(parent) {
1189                    let _ = dir_handle.sync_all();
1190                }
1191            }
1192            total
1193        };
1194
1195        crate::atomic::write_atomic_toml(&toml_path, meta)?;
1196
1197        // Invalidate per-part cache entries and drop the mpu dir.
1198        for n in &part_numbers {
1199            self.cache
1200                .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1201        }
1202        let mpu_dir = self.mpu_dir(bucket, upload_id);
1203        let _ = std::fs::remove_dir_all(&mpu_dir);
1204
1205        // The concatenated body is deliberately NOT re-inserted into the cache —
1206        // large multipart uploads typically exceed the single-object cap, and we
1207        // must never round-trip the assembled body through RAM. The next
1208        // open_object_body call will load through the normal cache path.
1209        self.cache.invalidate(&crate::cache::BodyKey::new(
1210            bucket.to_string(),
1211            final_key.to_string(),
1212            version.map(|s| s.to_string()),
1213        ));
1214
1215        Ok(BodyRef::Disk {
1216            bucket: bucket.to_string(),
1217            key: final_key.to_string(),
1218            version: version.map(|s| s.to_string()),
1219            path: bin_path,
1220            size: total_size,
1221        })
1222    }
1223}
1224
1225fn libc_exdev() -> i32 {
1226    18
1227}
1228
1229#[cfg(test)]
1230mod disk_tests {
1231    use super::*;
1232    use std::sync::Arc;
1233    use tempfile::TempDir;
1234
1235    fn new_store(tmp: &TempDir) -> DiskS3Store {
1236        let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1237        DiskS3Store::new(tmp.path().to_path_buf(), cache)
1238    }
1239
1240    fn new_store_with_cache(
1241        tmp: &TempDir,
1242        cap: u64,
1243    ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1244        let cache = Arc::new(crate::cache::BodyCache::new(cap));
1245        (
1246            DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1247            cache,
1248        )
1249    }
1250
1251    fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1252        ObjectMeta {
1253            key: key.to_string(),
1254            content_type: "application/octet-stream".to_string(),
1255            etag: "etag".to_string(),
1256            size,
1257            ..Default::default()
1258        }
1259    }
1260
1261    #[test]
1262    fn put_bucket_meta_roundtrip() {
1263        let tmp = TempDir::new().unwrap();
1264        let store = new_store(&tmp);
1265        let meta = BucketMeta {
1266            name: "b1".to_string(),
1267            region: "us-east-1".to_string(),
1268            versioning: Some("Enabled".to_string()),
1269            ..Default::default()
1270        };
1271        store.put_bucket_meta("b1", &meta).unwrap();
1272        let loaded = store.load().unwrap();
1273        let snap = loaded.buckets.get("b1").unwrap();
1274        assert_eq!(snap.meta.name, "b1");
1275        assert_eq!(snap.meta.region, "us-east-1");
1276        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1277    }
1278
1279    #[test]
1280    fn put_bucket_subresource_each_variant_writes_file() {
1281        let tmp = TempDir::new().unwrap();
1282        let store = new_store(&tmp);
1283        store
1284            .put_bucket_meta(
1285                "b",
1286                &BucketMeta {
1287                    name: "b".to_string(),
1288                    ..Default::default()
1289                },
1290            )
1291            .unwrap();
1292        let variants = [
1293            BucketSubresource::Tags,
1294            BucketSubresource::Lifecycle,
1295            BucketSubresource::Cors,
1296            BucketSubresource::Policy,
1297            BucketSubresource::Notification,
1298            BucketSubresource::Logging,
1299            BucketSubresource::Website,
1300            BucketSubresource::PublicAccessBlock,
1301            BucketSubresource::ObjectLock,
1302            BucketSubresource::Replication,
1303            BucketSubresource::Ownership,
1304            BucketSubresource::Inventory,
1305            BucketSubresource::Encryption,
1306            BucketSubresource::Versioning,
1307            BucketSubresource::Acl,
1308            BucketSubresource::Accelerate,
1309        ];
1310        for v in variants {
1311            store
1312                .put_bucket_subresource("b", v, "payload=true")
1313                .unwrap();
1314            let file = store
1315                .bucket_dir("b")
1316                .join(DiskS3Store::subresource_filename(v));
1317            assert!(file.exists(), "{:?}", v);
1318            assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1319        }
1320    }
1321
1322    #[test]
1323    fn delete_bucket_subresource_removes_file() {
1324        let tmp = TempDir::new().unwrap();
1325        let store = new_store(&tmp);
1326        store
1327            .put_bucket_meta(
1328                "b",
1329                &BucketMeta {
1330                    name: "b".to_string(),
1331                    ..Default::default()
1332                },
1333            )
1334            .unwrap();
1335        store
1336            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1337            .unwrap();
1338        store
1339            .delete_bucket_subresource("b", BucketSubresource::Tags)
1340            .unwrap();
1341        let file = store.bucket_dir("b").join("tags.toml");
1342        assert!(!file.exists());
1343        // idempotent
1344        store
1345            .delete_bucket_subresource("b", BucketSubresource::Tags)
1346            .unwrap();
1347    }
1348
1349    #[test]
1350    fn put_object_bytes_roundtrip() {
1351        let tmp = TempDir::new().unwrap();
1352        let store = new_store(&tmp);
1353        store
1354            .put_bucket_meta(
1355                "b",
1356                &BucketMeta {
1357                    name: "b".to_string(),
1358                    ..Default::default()
1359                },
1360            )
1361            .unwrap();
1362        let data = Bytes::from_static(b"hello world");
1363        let meta = sample_meta("k1", data.len() as u64);
1364        let body_ref = store
1365            .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1366            .unwrap();
1367        match &body_ref {
1368            BodyRef::Disk {
1369                bucket,
1370                key,
1371                size,
1372                path,
1373                ..
1374            } => {
1375                assert_eq!(bucket, "b");
1376                assert_eq!(key, "k1");
1377                assert_eq!(*size, data.len() as u64);
1378                assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1379            }
1380            _ => panic!("expected Disk"),
1381        }
1382        let loaded = store.load().unwrap();
1383        let snap = loaded.buckets.get("b").unwrap();
1384        let obj = snap.objects.get("k1").unwrap();
1385        assert_eq!(obj.meta.size, data.len() as u64);
1386    }
1387
1388    #[test]
1389    fn put_object_file_copy_source_leaves_src_in_place() {
1390        let tmp = TempDir::new().unwrap();
1391        let store = new_store(&tmp);
1392        store
1393            .put_bucket_meta(
1394                "b",
1395                &BucketMeta {
1396                    name: "b".to_string(),
1397                    ..Default::default()
1398                },
1399            )
1400            .unwrap();
1401        let src_dir = TempDir::new().unwrap();
1402        let src = src_dir.path().join("src.bin");
1403        std::fs::write(&src, b"copied-body").unwrap();
1404        let meta = sample_meta("k", 11);
1405        let bref = store
1406            .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1407            .unwrap();
1408        match bref {
1409            BodyRef::Disk { path, size, .. } => {
1410                assert_eq!(size, 11);
1411                assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1412            }
1413            _ => panic!("expected Disk bodyref"),
1414        }
1415        assert!(src.exists(), "source file must not be moved by FileCopy");
1416        assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1417    }
1418
1419    #[test]
1420    fn put_object_file_source() {
1421        let tmp = TempDir::new().unwrap();
1422        let store = new_store(&tmp);
1423        store
1424            .put_bucket_meta(
1425                "b",
1426                &BucketMeta {
1427                    name: "b".to_string(),
1428                    ..Default::default()
1429                },
1430            )
1431            .unwrap();
1432        let src = tmp.path().join("src.bin");
1433        std::fs::write(&src, b"file-body").unwrap();
1434        let meta = sample_meta("k", 9);
1435        let body_ref = store
1436            .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1437            .unwrap();
1438        let path = match body_ref {
1439            BodyRef::Disk { path, .. } => path,
1440            _ => panic!(),
1441        };
1442        assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1443    }
1444
1445    #[test]
1446    fn put_object_meta_only_keeps_bin() {
1447        let tmp = TempDir::new().unwrap();
1448        let store = new_store(&tmp);
1449        store
1450            .put_bucket_meta(
1451                "b",
1452                &BucketMeta {
1453                    name: "b".to_string(),
1454                    ..Default::default()
1455                },
1456            )
1457            .unwrap();
1458        let data = Bytes::from_static(b"abc");
1459        let mut meta = sample_meta("k", 3);
1460        store
1461            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1462            .unwrap();
1463        let (_, bin, _) = store.object_paths("b", "k", None);
1464        let before = std::fs::read(&bin).unwrap();
1465        meta.tags.insert("x".to_string(), "y".to_string());
1466        store.put_object_meta("b", "k", None, &meta).unwrap();
1467        assert_eq!(std::fs::read(&bin).unwrap(), before);
1468        let loaded = store.load().unwrap();
1469        let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1470        assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1471    }
1472
1473    #[test]
1474    fn delete_object_cleans_up_files_and_cache() {
1475        let tmp = TempDir::new().unwrap();
1476        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1477        store
1478            .put_bucket_meta(
1479                "b",
1480                &BucketMeta {
1481                    name: "b".to_string(),
1482                    ..Default::default()
1483                },
1484            )
1485            .unwrap();
1486        let data = Bytes::from_static(b"bye");
1487        let meta = sample_meta("k", 3);
1488        store
1489            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1490            .unwrap();
1491        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1492        assert!(cache.get(&body_key).is_some());
1493        store.delete_object("b", "k", None).unwrap();
1494        let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1495        assert!(!bin.exists());
1496        assert!(!toml_path.exists());
1497        assert!(!dir.exists());
1498        assert!(cache.get(&body_key).is_none());
1499    }
1500
1501    #[test]
1502    fn open_object_body_cache_hit_and_refill() {
1503        let tmp = TempDir::new().unwrap();
1504        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1505        store
1506            .put_bucket_meta(
1507                "b",
1508                &BucketMeta {
1509                    name: "b".to_string(),
1510                    ..Default::default()
1511                },
1512            )
1513            .unwrap();
1514        let data = Bytes::from_static(b"payload");
1515        let meta = sample_meta("k", data.len() as u64);
1516        let body_ref = store
1517            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1518            .unwrap();
1519        // Cache hit.
1520        let got = store.open_object_body(&body_ref).unwrap();
1521        assert_eq!(got, data);
1522        // Invalidate and re-read populates cache from disk.
1523        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1524        cache.invalidate(&body_key);
1525        assert!(cache.get(&body_key).is_none());
1526        let got = store.open_object_body(&body_ref).unwrap();
1527        assert_eq!(got, data);
1528        assert!(cache.get(&body_key).is_some());
1529    }
1530
1531    #[test]
1532    fn open_object_body_large_bypasses_cache() {
1533        let tmp = TempDir::new().unwrap();
1534        // capacity 1024 → single-object cap 512. Use 800-byte body.
1535        let (store, cache) = new_store_with_cache(&tmp, 1024);
1536        store
1537            .put_bucket_meta(
1538                "b",
1539                &BucketMeta {
1540                    name: "b".to_string(),
1541                    ..Default::default()
1542                },
1543            )
1544            .unwrap();
1545        let data = Bytes::from(vec![7u8; 800]);
1546        let meta = sample_meta("big", 800);
1547        let body_ref = store
1548            .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1549            .unwrap();
1550        let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1551        assert!(cache.get(&body_key).is_none());
1552        let got = store.open_object_body(&body_ref).unwrap();
1553        assert_eq!(got, data);
1554        // Still none — exceeds single-object cap.
1555        assert!(cache.get(&body_key).is_none());
1556    }
1557
1558    #[test]
1559    fn load_empty_dir() {
1560        let tmp = TempDir::new().unwrap();
1561        let store = new_store(&tmp);
1562        let s = store.load().unwrap();
1563        assert!(s.buckets.is_empty());
1564    }
1565
1566    #[test]
1567    fn load_skips_mpu_without_init() {
1568        let tmp = TempDir::new().unwrap();
1569        let store = new_store(&tmp);
1570        store
1571            .put_bucket_meta(
1572                "b",
1573                &BucketMeta {
1574                    name: "b".to_string(),
1575                    ..Default::default()
1576                },
1577            )
1578            .unwrap();
1579        let data = Bytes::from_static(b"x");
1580        let meta = sample_meta("k", 1);
1581        store
1582            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1583            .unwrap();
1584        // Directory with no init.toml is skipped by load.
1585        let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1586        std::fs::create_dir_all(&mpu).unwrap();
1587
1588        let loaded = store.load().unwrap();
1589        let snap = loaded.buckets.get("b").unwrap();
1590        assert_eq!(snap.objects.len(), 1);
1591        assert!(snap.multipart_uploads.is_empty());
1592    }
1593
1594    #[test]
1595    fn load_reads_bucket_subresources() {
1596        let tmp = TempDir::new().unwrap();
1597        let store = new_store(&tmp);
1598        store
1599            .put_bucket_meta(
1600                "b",
1601                &BucketMeta {
1602                    name: "b".to_string(),
1603                    ..Default::default()
1604                },
1605            )
1606            .unwrap();
1607        store
1608            .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1609            .unwrap();
1610        store
1611            .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1612            .unwrap();
1613        store
1614            .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1615            .unwrap();
1616        store
1617            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1618            .unwrap();
1619        let loaded = store.load().unwrap();
1620        let snap = loaded.buckets.get("b").unwrap();
1621        assert_eq!(
1622            snap.subresources.get("lifecycle.toml").map(String::as_str),
1623            Some("<Lifecycle/>"),
1624        );
1625        assert_eq!(
1626            snap.subresources.get("cors.toml").map(String::as_str),
1627            Some("<Cors/>"),
1628        );
1629        assert_eq!(
1630            snap.subresources.get("policy.toml").map(String::as_str),
1631            Some("{}"),
1632        );
1633        assert_eq!(
1634            snap.subresources.get("tags.toml").map(String::as_str),
1635            Some("x=1"),
1636        );
1637    }
1638
1639    #[test]
1640    fn versioned_put_load_roundtrip() {
1641        let tmp = TempDir::new().unwrap();
1642        let store = new_store(&tmp);
1643        store
1644            .put_bucket_meta(
1645                "b",
1646                &BucketMeta {
1647                    name: "b".to_string(),
1648                    versioning: Some("Enabled".to_string()),
1649                    ..Default::default()
1650                },
1651            )
1652            .unwrap();
1653        let base = chrono::Utc::now();
1654        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1655            .iter()
1656            .enumerate()
1657        {
1658            let mut m = sample_meta("k", body.len() as u64);
1659            m.version_id = Some((*vid).to_string());
1660            m.last_modified = base + chrono::Duration::seconds(i as i64);
1661            store
1662                .put_object(
1663                    "b",
1664                    "k",
1665                    Some(*vid),
1666                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1667                    &m,
1668                )
1669                .unwrap();
1670        }
1671        let loaded = store.load().unwrap();
1672        let snap = loaded.buckets.get("b").unwrap();
1673        let versions = snap.object_versions.get("k").expect("versions present");
1674        assert_eq!(versions.len(), 3);
1675        assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1676        assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1677        assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1678        for v in versions {
1679            match &v.body {
1680                BodyRef::Disk { size, .. } => assert!(*size > 0),
1681                _ => panic!("expected Disk body"),
1682            }
1683        }
1684    }
1685
1686    #[test]
1687    fn versioned_load_promotes_latest_live_to_snap_objects() {
1688        let tmp = TempDir::new().unwrap();
1689        let store = new_store(&tmp);
1690        store
1691            .put_bucket_meta(
1692                "b",
1693                &BucketMeta {
1694                    name: "b".to_string(),
1695                    versioning: Some("Enabled".to_string()),
1696                    ..Default::default()
1697                },
1698            )
1699            .unwrap();
1700        let base = chrono::Utc::now();
1701        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1702            .iter()
1703            .enumerate()
1704        {
1705            let mut m = sample_meta("k", body.len() as u64);
1706            m.version_id = Some((*vid).to_string());
1707            m.last_modified = base + chrono::Duration::seconds(i as i64);
1708            store
1709                .put_object(
1710                    "b",
1711                    "k",
1712                    Some(*vid),
1713                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1714                    &m,
1715                )
1716                .unwrap();
1717        }
1718        let loaded = store.load().unwrap();
1719        let snap = loaded.buckets.get("b").unwrap();
1720        let current = snap.objects.get("k").expect("current object promoted");
1721        assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1722    }
1723
1724    #[test]
1725    fn versioned_load_trailing_delete_marker_hides_current() {
1726        let tmp = TempDir::new().unwrap();
1727        let store = new_store(&tmp);
1728        store
1729            .put_bucket_meta(
1730                "b",
1731                &BucketMeta {
1732                    name: "b".to_string(),
1733                    versioning: Some("Enabled".to_string()),
1734                    ..Default::default()
1735                },
1736            )
1737            .unwrap();
1738        let base = chrono::Utc::now();
1739        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1740            .iter()
1741            .enumerate()
1742        {
1743            let mut m = sample_meta("k", body.len() as u64);
1744            m.version_id = Some((*vid).to_string());
1745            m.last_modified = base + chrono::Duration::seconds(i as i64);
1746            store
1747                .put_object(
1748                    "b",
1749                    "k",
1750                    Some(*vid),
1751                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1752                    &m,
1753                )
1754                .unwrap();
1755        }
1756        // Append a delete marker on top.
1757        let mut dm = sample_meta("k", 0);
1758        dm.version_id = Some("dm1".to_string());
1759        dm.is_delete_marker = true;
1760        dm.last_modified = base + chrono::Duration::seconds(10);
1761        store
1762            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1763            .unwrap();
1764        let loaded = store.load().unwrap();
1765        let snap = loaded.buckets.get("b").unwrap();
1766        assert!(
1767            !snap.objects.contains_key("k"),
1768            "trailing delete marker must hide current object",
1769        );
1770        assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1771    }
1772
1773    #[test]
1774    fn legacy_null_object_overridden_by_newer_versions() {
1775        // A pre-versioning null put followed by versioning-enabled puts must
1776        // see snap.objects reflect the newest live version, not the stale null.
1777        let tmp = TempDir::new().unwrap();
1778        let store = new_store(&tmp);
1779        store
1780            .put_bucket_meta(
1781                "b",
1782                &BucketMeta {
1783                    name: "b".to_string(),
1784                    ..Default::default()
1785                },
1786            )
1787            .unwrap();
1788        let base = chrono::Utc::now();
1789        let mut null_meta = sample_meta("k", 3);
1790        null_meta.last_modified = base;
1791        store
1792            .put_object(
1793                "b",
1794                "k",
1795                None,
1796                BodySource::Bytes(Bytes::from_static(b"old")),
1797                &null_meta,
1798            )
1799            .unwrap();
1800        // Enable versioning and put two versions, the second a delete.
1801        store
1802            .put_bucket_meta(
1803                "b",
1804                &BucketMeta {
1805                    name: "b".to_string(),
1806                    versioning: Some("Enabled".to_string()),
1807                    ..Default::default()
1808                },
1809            )
1810            .unwrap();
1811        let mut v1 = sample_meta("k", 3);
1812        v1.version_id = Some("v1".to_string());
1813        v1.last_modified = base + chrono::Duration::seconds(1);
1814        store
1815            .put_object(
1816                "b",
1817                "k",
1818                Some("v1"),
1819                BodySource::Bytes(Bytes::from_static(b"new")),
1820                &v1,
1821            )
1822            .unwrap();
1823        let mut v2 = sample_meta("k", 5);
1824        v2.version_id = Some("v2".to_string());
1825        v2.last_modified = base + chrono::Duration::seconds(2);
1826        store
1827            .put_object(
1828                "b",
1829                "k",
1830                Some("v2"),
1831                BodySource::Bytes(Bytes::from_static(b"newer")),
1832                &v2,
1833            )
1834            .unwrap();
1835        let loaded = store.load().unwrap();
1836        let snap = loaded.buckets.get("b").unwrap();
1837        let current = snap
1838            .objects
1839            .get("k")
1840            .expect("latest live version must override stale null");
1841        assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1842        assert_eq!(current.meta.size, 5);
1843    }
1844
1845    #[test]
1846    fn legacy_null_hidden_by_trailing_delete_marker() {
1847        let tmp = TempDir::new().unwrap();
1848        let store = new_store(&tmp);
1849        store
1850            .put_bucket_meta(
1851                "b",
1852                &BucketMeta {
1853                    name: "b".to_string(),
1854                    ..Default::default()
1855                },
1856            )
1857            .unwrap();
1858        let base = chrono::Utc::now();
1859        let mut null_meta = sample_meta("k", 3);
1860        null_meta.last_modified = base;
1861        store
1862            .put_object(
1863                "b",
1864                "k",
1865                None,
1866                BodySource::Bytes(Bytes::from_static(b"old")),
1867                &null_meta,
1868            )
1869            .unwrap();
1870        store
1871            .put_bucket_meta(
1872                "b",
1873                &BucketMeta {
1874                    name: "b".to_string(),
1875                    versioning: Some("Enabled".to_string()),
1876                    ..Default::default()
1877                },
1878            )
1879            .unwrap();
1880        let mut v1 = sample_meta("k", 3);
1881        v1.version_id = Some("v1".to_string());
1882        v1.last_modified = base + chrono::Duration::seconds(1);
1883        store
1884            .put_object(
1885                "b",
1886                "k",
1887                Some("v1"),
1888                BodySource::Bytes(Bytes::from_static(b"new")),
1889                &v1,
1890            )
1891            .unwrap();
1892        let mut dm = sample_meta("k", 0);
1893        dm.version_id = Some("dm1".to_string());
1894        dm.is_delete_marker = true;
1895        dm.last_modified = base + chrono::Duration::seconds(2);
1896        store
1897            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1898            .unwrap();
1899        let loaded = store.load().unwrap();
1900        let snap = loaded.buckets.get("b").unwrap();
1901        assert!(
1902            !snap.objects.contains_key("k"),
1903            "trailing delete marker must hide even a legacy null object",
1904        );
1905    }
1906
1907    #[test]
1908    fn delete_marker_roundtrip_no_body_file() {
1909        let tmp = TempDir::new().unwrap();
1910        let store = new_store(&tmp);
1911        store
1912            .put_bucket_meta(
1913                "b",
1914                &BucketMeta {
1915                    name: "b".to_string(),
1916                    versioning: Some("Enabled".to_string()),
1917                    ..Default::default()
1918                },
1919            )
1920            .unwrap();
1921        let mut m = sample_meta("k", 0);
1922        m.version_id = Some("dm1".to_string());
1923        m.is_delete_marker = true;
1924        store
1925            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1926            .unwrap();
1927        // No .bin file on disk for delete markers.
1928        let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1929        assert!(!bin.exists(), "delete marker must not have a .bin file");
1930        assert!(toml_path.exists());
1931
1932        let loaded = store.load().unwrap();
1933        let versions = loaded
1934            .buckets
1935            .get("b")
1936            .unwrap()
1937            .object_versions
1938            .get("k")
1939            .unwrap();
1940        assert_eq!(versions.len(), 1);
1941        assert!(versions[0].meta.is_delete_marker);
1942        match &versions[0].body {
1943            BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1944            _ => panic!("delete marker body should be empty Memory"),
1945        }
1946    }
1947
1948    #[test]
1949    fn mixed_nonversioned_and_versioned_buckets() {
1950        let tmp = TempDir::new().unwrap();
1951        let store = new_store(&tmp);
1952        store
1953            .put_bucket_meta(
1954                "a",
1955                &BucketMeta {
1956                    name: "a".to_string(),
1957                    ..Default::default()
1958                },
1959            )
1960            .unwrap();
1961        store
1962            .put_bucket_meta(
1963                "b",
1964                &BucketMeta {
1965                    name: "b".to_string(),
1966                    versioning: Some("Enabled".to_string()),
1967                    ..Default::default()
1968                },
1969            )
1970            .unwrap();
1971        let ma = sample_meta("only", 3);
1972        store
1973            .put_object(
1974                "a",
1975                "only",
1976                None,
1977                BodySource::Bytes(Bytes::from_static(b"aaa")),
1978                &ma,
1979            )
1980            .unwrap();
1981        let base = chrono::Utc::now();
1982        for (i, vid) in ["v1", "v2"].iter().enumerate() {
1983            let mut m = sample_meta("twice", 2);
1984            m.version_id = Some((*vid).to_string());
1985            m.last_modified = base + chrono::Duration::seconds(i as i64);
1986            store
1987                .put_object(
1988                    "b",
1989                    "twice",
1990                    Some(*vid),
1991                    BodySource::Bytes(Bytes::from_static(b"xx")),
1992                    &m,
1993                )
1994                .unwrap();
1995        }
1996        let loaded = store.load().unwrap();
1997        assert_eq!(loaded.buckets.len(), 2);
1998        let a = loaded.buckets.get("a").unwrap();
1999        assert_eq!(a.objects.len(), 1);
2000        assert!(a.object_versions.is_empty());
2001        let b = loaded.buckets.get("b").unwrap();
2002        // Fix #5: the latest live version is promoted into objects so
2003        // unversioned GETs see it post-restart.
2004        assert_eq!(
2005            b.objects.get("twice").unwrap().meta.version_id.as_deref(),
2006            Some("v2")
2007        );
2008        assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
2009    }
2010
2011    fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
2012        MpuInit {
2013            upload_id: upload_id.to_string(),
2014            key: key.to_string(),
2015            initiated: chrono::Utc::now(),
2016            content_type: "application/octet-stream".to_string(),
2017            storage_class: "STANDARD".to_string(),
2018            ..Default::default()
2019        }
2020    }
2021
2022    #[test]
2023    fn mpu_create_then_load_empty_parts() {
2024        let tmp = TempDir::new().unwrap();
2025        let store = new_store(&tmp);
2026        store
2027            .put_bucket_meta(
2028                "b",
2029                &BucketMeta {
2030                    name: "b".to_string(),
2031                    ..Default::default()
2032                },
2033            )
2034            .unwrap();
2035        let init = sample_mpu_init("up1", "k1");
2036        store.mpu_create("b", "up1", &init).unwrap();
2037        let loaded = store.load().unwrap();
2038        let snap = loaded.buckets.get("b").unwrap();
2039        let m = snap.multipart_uploads.get("up1").expect("upload present");
2040        assert_eq!(m.init.upload_id, "up1");
2041        assert_eq!(m.init.key, "k1");
2042        assert!(m.parts.is_empty());
2043    }
2044
2045    #[test]
2046    fn mpu_put_part_then_load_three_parts() {
2047        let tmp = TempDir::new().unwrap();
2048        let store = new_store(&tmp);
2049        store
2050            .put_bucket_meta(
2051                "b",
2052                &BucketMeta {
2053                    name: "b".to_string(),
2054                    ..Default::default()
2055                },
2056            )
2057            .unwrap();
2058        store
2059            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2060            .unwrap();
2061        let bodies = [
2062            Bytes::from_static(b"part-one"),
2063            Bytes::from_static(b"part-two-longer"),
2064            Bytes::from_static(b"p3"),
2065        ];
2066        for (i, body) in bodies.iter().enumerate() {
2067            let n = (i + 1) as u32;
2068            store
2069                .mpu_put_part(
2070                    "b",
2071                    "up",
2072                    n,
2073                    BodySource::Bytes(body.clone()),
2074                    &format!("et{}", n),
2075                )
2076                .unwrap();
2077        }
2078        let loaded = store.load().unwrap();
2079        let snap = loaded.buckets.get("b").unwrap();
2080        let m = snap.multipart_uploads.get("up").unwrap();
2081        assert_eq!(m.parts.len(), 3);
2082        for (i, body) in bodies.iter().enumerate() {
2083            let n = (i + 1) as u32;
2084            let part = m.parts.get(&n).unwrap();
2085            assert_eq!(part.meta.part_number, n);
2086            assert_eq!(part.meta.size, body.len() as u64);
2087            assert_eq!(part.meta.etag, format!("et{}", n));
2088            match &part.body {
2089                BodyRef::Disk { path, size, .. } => {
2090                    assert_eq!(*size, body.len() as u64);
2091                    assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2092                }
2093                _ => panic!("expected Disk"),
2094            }
2095        }
2096    }
2097
2098    #[test]
2099    fn mpu_abort_removes_upload() {
2100        let tmp = TempDir::new().unwrap();
2101        let store = new_store(&tmp);
2102        store
2103            .put_bucket_meta(
2104                "b",
2105                &BucketMeta {
2106                    name: "b".to_string(),
2107                    ..Default::default()
2108                },
2109            )
2110            .unwrap();
2111        store
2112            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2113            .unwrap();
2114        store
2115            .mpu_put_part(
2116                "b",
2117                "up",
2118                1,
2119                BodySource::Bytes(Bytes::from_static(b"x")),
2120                "e",
2121            )
2122            .unwrap();
2123        store.mpu_abort("b", "up").unwrap();
2124        assert!(!store.mpu_dir("b", "up").exists());
2125        // Idempotent.
2126        store.mpu_abort("b", "up").unwrap();
2127        let loaded = store.load().unwrap();
2128        let snap = loaded.buckets.get("b").unwrap();
2129        assert!(snap.multipart_uploads.is_empty());
2130    }
2131
2132    #[test]
2133    fn mpu_complete_single_part_fast_path() {
2134        let tmp = TempDir::new().unwrap();
2135        let store = new_store(&tmp);
2136        store
2137            .put_bucket_meta(
2138                "b",
2139                &BucketMeta {
2140                    name: "b".to_string(),
2141                    ..Default::default()
2142                },
2143            )
2144            .unwrap();
2145        store
2146            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2147            .unwrap();
2148        let body = Bytes::from_static(b"only-part-bytes");
2149        store
2150            .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2151            .unwrap();
2152        let meta = sample_meta("k", body.len() as u64);
2153        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2154        match &body_ref {
2155            BodyRef::Disk { path, size, .. } => {
2156                assert_eq!(*size, body.len() as u64);
2157                assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2158            }
2159            _ => panic!("expected Disk"),
2160        }
2161        assert!(!store.mpu_dir("b", "up").exists());
2162    }
2163
2164    #[test]
2165    fn mpu_complete_multi_part_concat() {
2166        let tmp = TempDir::new().unwrap();
2167        let store = new_store(&tmp);
2168        store
2169            .put_bucket_meta(
2170                "b",
2171                &BucketMeta {
2172                    name: "b".to_string(),
2173                    ..Default::default()
2174                },
2175            )
2176            .unwrap();
2177        store
2178            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2179            .unwrap();
2180        let p1 = Bytes::from_static(b"AAAA");
2181        let p2 = Bytes::from_static(b"BBBBBB");
2182        let p3 = Bytes::from_static(b"CC");
2183        for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2184            store
2185                .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2186                .unwrap();
2187        }
2188        let mut expected = Vec::new();
2189        expected.extend_from_slice(&p1);
2190        expected.extend_from_slice(&p2);
2191        expected.extend_from_slice(&p3);
2192        let meta = sample_meta("k", expected.len() as u64);
2193        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2194        let path = match body_ref {
2195            BodyRef::Disk { path, size, .. } => {
2196                assert_eq!(size, expected.len() as u64);
2197                path
2198            }
2199            _ => panic!("expected Disk"),
2200        };
2201        assert_eq!(std::fs::read(&path).unwrap(), expected);
2202        assert!(!store.mpu_dir("b", "up").exists());
2203    }
2204
2205    #[test]
2206    fn mpu_complete_large_streaming_via_file_source() {
2207        let tmp = TempDir::new().unwrap();
2208        let store = new_store(&tmp);
2209        store
2210            .put_bucket_meta(
2211                "b",
2212                &BucketMeta {
2213                    name: "b".to_string(),
2214                    ..Default::default()
2215                },
2216            )
2217            .unwrap();
2218        store
2219            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2220            .unwrap();
2221
2222        // 3 parts of ~1 MiB each (kept small so tests stay fast but still
2223        // exercise the BodySource::File + streaming concat path).
2224        const PART_SIZE: usize = 1024 * 1024;
2225        let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2226        let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2227        for (i, byte) in patterns.iter().enumerate() {
2228            let src = tmp.path().join(format!("src-{}.bin", i + 1));
2229            let data = vec![*byte; PART_SIZE];
2230            std::fs::write(&src, &data).unwrap();
2231            expected.extend_from_slice(&data);
2232            store
2233                .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2234                .unwrap();
2235        }
2236        let meta = sample_meta("k", expected.len() as u64);
2237        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2238        let path = match body_ref {
2239            BodyRef::Disk { path, size, .. } => {
2240                assert_eq!(size, expected.len() as u64);
2241                path
2242            }
2243            _ => panic!("expected Disk"),
2244        };
2245        let actual = std::fs::read(&path).unwrap();
2246        assert_eq!(actual.len(), expected.len());
2247        assert_eq!(actual, expected);
2248        assert!(!store.mpu_dir("b", "up").exists());
2249    }
2250
2251    #[test]
2252    fn mpu_resumable_across_load() {
2253        let tmp = TempDir::new().unwrap();
2254        let store = new_store(&tmp);
2255        store
2256            .put_bucket_meta(
2257                "b",
2258                &BucketMeta {
2259                    name: "b".to_string(),
2260                    ..Default::default()
2261                },
2262            )
2263            .unwrap();
2264        store
2265            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2266            .unwrap();
2267        let p1 = Bytes::from_static(b"hello-");
2268        let p2 = Bytes::from_static(b"world-");
2269        store
2270            .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2271            .unwrap();
2272        store
2273            .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2274            .unwrap();
2275
2276        // Simulate a restart: re-open the store on the same dir and load.
2277        let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2278        let loaded = store2.load().unwrap();
2279        let snap = loaded.buckets.get("b").unwrap();
2280        let m = snap.multipart_uploads.get("up").unwrap();
2281        assert_eq!(m.parts.len(), 2);
2282        assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2283        assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2284
2285        // Continue the upload on the fresh store.
2286        let p3 = Bytes::from_static(b"again!");
2287        store2
2288            .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2289            .unwrap();
2290        let mut expected = Vec::new();
2291        expected.extend_from_slice(&p1);
2292        expected.extend_from_slice(&p2);
2293        expected.extend_from_slice(&p3);
2294        let meta = sample_meta("k", expected.len() as u64);
2295        let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2296        let path = match body_ref {
2297            BodyRef::Disk { path, .. } => path,
2298            _ => panic!(),
2299        };
2300        assert_eq!(std::fs::read(&path).unwrap(), expected);
2301        assert!(!store2.mpu_dir("b", "up").exists());
2302    }
2303
2304    #[test]
2305    fn tags_snapshot_roundtrip_via_store() {
2306        let tmp = TempDir::new().unwrap();
2307        let store = new_store(&tmp);
2308        store
2309            .put_bucket_meta(
2310                "b",
2311                &BucketMeta {
2312                    name: "b".to_string(),
2313                    ..Default::default()
2314                },
2315            )
2316            .unwrap();
2317        let mut tags = BTreeMap::new();
2318        tags.insert("env".to_string(), "prod".to_string());
2319        tags.insert("team".to_string(), "s3".to_string());
2320        let snap = TagsSnapshot { tags: tags.clone() };
2321        let payload = toml::to_string(&snap).unwrap();
2322        store
2323            .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2324            .unwrap();
2325        let loaded = store.load().unwrap();
2326        let text = loaded
2327            .buckets
2328            .get("b")
2329            .unwrap()
2330            .subresources
2331            .get("tags.toml")
2332            .cloned()
2333            .unwrap();
2334        let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2335        assert_eq!(decoded.tags, tags);
2336    }
2337
2338    #[test]
2339    fn acl_snapshot_roundtrip_via_store() {
2340        let tmp = TempDir::new().unwrap();
2341        let store = new_store(&tmp);
2342        store
2343            .put_bucket_meta(
2344                "b",
2345                &BucketMeta {
2346                    name: "b".to_string(),
2347                    ..Default::default()
2348                },
2349            )
2350            .unwrap();
2351        let snap = AclSnapshot {
2352            owner_id: "owner-abc".to_string(),
2353            grants: vec![
2354                AclGrantSnapshot {
2355                    grantee_type: "CanonicalUser".to_string(),
2356                    grantee_id: Some("owner-abc".to_string()),
2357                    grantee_display_name: Some("owner".to_string()),
2358                    grantee_uri: None,
2359                    permission: "FULL_CONTROL".to_string(),
2360                },
2361                AclGrantSnapshot {
2362                    grantee_type: "Group".to_string(),
2363                    grantee_id: None,
2364                    grantee_display_name: None,
2365                    grantee_uri: Some(
2366                        "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2367                    ),
2368                    permission: "READ".to_string(),
2369                },
2370            ],
2371        };
2372        let payload = toml::to_string(&snap).unwrap();
2373        store
2374            .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2375            .unwrap();
2376        let loaded = store.load().unwrap();
2377        let text = loaded
2378            .buckets
2379            .get("b")
2380            .unwrap()
2381            .subresources
2382            .get("acl.toml")
2383            .cloned()
2384            .unwrap();
2385        let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2386        assert_eq!(decoded.owner_id, "owner-abc");
2387        assert_eq!(decoded.grants.len(), 2);
2388        assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2389        assert_eq!(decoded.grants[1].grantee_type, "Group");
2390    }
2391
2392    #[test]
2393    fn inventory_snapshot_roundtrip_via_store() {
2394        let tmp = TempDir::new().unwrap();
2395        let store = new_store(&tmp);
2396        store
2397            .put_bucket_meta(
2398                "b",
2399                &BucketMeta {
2400                    name: "b".to_string(),
2401                    ..Default::default()
2402                },
2403            )
2404            .unwrap();
2405        let mut configs = BTreeMap::new();
2406        configs.insert(
2407            "inv-1".to_string(),
2408            "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2409        );
2410        configs.insert(
2411            "inv-2".to_string(),
2412            "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2413        );
2414        let snap = InventorySnapshot {
2415            configs: configs.clone(),
2416        };
2417        let payload = toml::to_string(&snap).unwrap();
2418        store
2419            .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2420            .unwrap();
2421        let loaded = store.load().unwrap();
2422        let text = loaded
2423            .buckets
2424            .get("b")
2425            .unwrap()
2426            .subresources
2427            .get("inventory.toml")
2428            .cloned()
2429            .unwrap();
2430        let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2431        assert_eq!(decoded.configs, configs);
2432    }
2433
2434    #[test]
2435    fn legacy_versioning_file_is_read() {
2436        let tmp = TempDir::new().unwrap();
2437        let store = new_store(&tmp);
2438        // Bucket meta with no versioning field set.
2439        store
2440            .put_bucket_meta(
2441                "b",
2442                &BucketMeta {
2443                    name: "b".to_string(),
2444                    ..Default::default()
2445                },
2446            )
2447            .unwrap();
2448        // Legacy sidecar: a bare versioning.toml with "Enabled".
2449        let path = store.bucket_dir("b").join("versioning.toml");
2450        std::fs::write(&path, "Enabled").unwrap();
2451        let loaded = store.load().unwrap();
2452        let snap = loaded.buckets.get("b").unwrap();
2453        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2454    }
2455
2456    #[test]
2457    fn load_skips_corrupt_bucket_and_keeps_others() {
2458        // One corrupt bucket must be skipped with a warning, not abort the
2459        // whole store load (bug-audit 2026-06-20, 4.3).
2460        let tmp = TempDir::new().unwrap();
2461        let store = new_store(&tmp);
2462        store
2463            .put_bucket_meta(
2464                "good",
2465                &BucketMeta {
2466                    name: "good".to_string(),
2467                    ..Default::default()
2468                },
2469            )
2470            .unwrap();
2471        store
2472            .put_object(
2473                "good",
2474                "k",
2475                None,
2476                BodySource::Bytes(Bytes::from_static(b"hi")),
2477                &sample_meta("k", 2),
2478            )
2479            .unwrap();
2480
2481        let bad_dir = store.buckets_dir().join("bad");
2482        std::fs::create_dir_all(&bad_dir).unwrap();
2483        std::fs::write(bad_dir.join("meta.toml"), b"not valid toml = = =").unwrap();
2484
2485        let loaded = store.load().unwrap();
2486        assert!(
2487            loaded.buckets.contains_key("good"),
2488            "valid bucket must load"
2489        );
2490        assert!(
2491            !loaded.buckets.contains_key("bad"),
2492            "corrupt bucket must be skipped, not abort the load"
2493        );
2494    }
2495}