Skip to main content

fakecloud_persistence/
s3.rs

1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11    #[serde(default)]
12    pub account_id: String,
13    #[serde(default)]
14    pub region: String,
15    #[serde(default)]
16    pub buckets: BTreeMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21    pub meta: BucketMeta,
22    #[serde(default)]
23    pub objects: BTreeMap<String, LoadedObject>,
24    #[serde(default)]
25    pub object_versions: BTreeMap<String, Vec<LoadedObject>>,
26    #[serde(default)]
27    pub subresources: BTreeMap<String, String>,
28    #[serde(default)]
29    pub multipart_uploads: BTreeMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34    pub init: MpuInit,
35    #[serde(default)]
36    pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41    pub meta: UploadPartMeta,
42    #[serde(default)]
43    pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48    pub meta: ObjectMeta,
49    #[serde(default)]
50    pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55    #[serde(default)]
56    pub name: String,
57    #[serde(default = "default_time")]
58    pub creation_date: DateTime<Utc>,
59    #[serde(default)]
60    pub region: String,
61    #[serde(default)]
62    pub versioning: Option<String>,
63    #[serde(default)]
64    pub acl: Option<String>,
65    #[serde(default)]
66    pub acl_owner_id: String,
67    #[serde(default)]
68    pub accelerate_status: Option<String>,
69    #[serde(default)]
70    pub eventbridge_enabled: bool,
71    #[serde(default)]
72    pub lifecycle_transition_default_min_size: Option<String>,
73}
74
75fn default_time() -> DateTime<Utc> {
76    DateTime::<Utc>::MIN_UTC
77}
78
79#[derive(Debug, Default, Clone, Serialize, Deserialize)]
80pub struct AclGrantSnapshot {
81    pub grantee_type: String,
82    #[serde(default)]
83    pub grantee_id: Option<String>,
84    #[serde(default)]
85    pub grantee_display_name: Option<String>,
86    #[serde(default)]
87    pub grantee_uri: Option<String>,
88    pub permission: String,
89}
90
91#[derive(Debug, Default, Clone, Serialize, Deserialize)]
92pub struct TagsSnapshot {
93    #[serde(default)]
94    pub tags: BTreeMap<String, String>,
95}
96
97#[derive(Debug, Default, Clone, Serialize, Deserialize)]
98pub struct AclSnapshot {
99    #[serde(default)]
100    pub owner_id: String,
101    #[serde(default)]
102    pub grants: Vec<AclGrantSnapshot>,
103}
104
105#[derive(Debug, Default, Clone, Serialize, Deserialize)]
106pub struct InventorySnapshot {
107    #[serde(default)]
108    pub configs: BTreeMap<String, String>,
109}
110
111#[derive(Debug, Default, Clone, Serialize, Deserialize)]
112pub struct ObjectMeta {
113    #[serde(default)]
114    pub key: String,
115    #[serde(default)]
116    pub content_type: String,
117    #[serde(default)]
118    pub etag: String,
119    #[serde(default)]
120    pub size: u64,
121    #[serde(default = "default_time")]
122    pub last_modified: DateTime<Utc>,
123    #[serde(default)]
124    pub metadata: BTreeMap<String, String>,
125    #[serde(default)]
126    pub tags: BTreeMap<String, String>,
127    #[serde(default)]
128    pub storage_class: String,
129    #[serde(default)]
130    pub acl_grants: Vec<AclGrantSnapshot>,
131    #[serde(default)]
132    pub acl_owner_id: Option<String>,
133    #[serde(default)]
134    pub parts_count: Option<u32>,
135    #[serde(default)]
136    pub part_sizes: Option<Vec<(u32, u64)>>,
137    #[serde(default)]
138    pub sse_algorithm: Option<String>,
139    #[serde(default)]
140    pub sse_kms_key_id: Option<String>,
141    #[serde(default)]
142    pub bucket_key_enabled: Option<bool>,
143    #[serde(default)]
144    pub version_id: Option<String>,
145    #[serde(default)]
146    pub is_delete_marker: bool,
147    #[serde(default)]
148    pub restore_ongoing: Option<bool>,
149    #[serde(default)]
150    pub restore_expiry: Option<String>,
151    #[serde(default)]
152    pub checksum_algorithm: Option<String>,
153    #[serde(default)]
154    pub checksum_value: Option<String>,
155    #[serde(default)]
156    pub lock_mode: Option<String>,
157    #[serde(default)]
158    pub lock_retain_until: Option<DateTime<Utc>>,
159    #[serde(default)]
160    pub lock_legal_hold: Option<String>,
161    #[serde(default)]
162    pub content_encoding: Option<String>,
163    #[serde(default)]
164    pub website_redirect_location: Option<String>,
165}
166
167#[derive(Debug, Default, Clone, Serialize, Deserialize)]
168pub struct MpuInit {
169    pub upload_id: String,
170    pub key: String,
171    #[serde(default = "default_time")]
172    pub initiated: DateTime<Utc>,
173    #[serde(default)]
174    pub metadata: BTreeMap<String, String>,
175    #[serde(default)]
176    pub content_type: String,
177    #[serde(default)]
178    pub storage_class: String,
179    #[serde(default)]
180    pub sse_algorithm: Option<String>,
181    #[serde(default)]
182    pub sse_kms_key_id: Option<String>,
183    #[serde(default)]
184    pub tagging: Option<String>,
185    #[serde(default)]
186    pub acl_grants: Vec<AclGrantSnapshot>,
187    #[serde(default)]
188    pub checksum_algorithm: Option<String>,
189}
190
191#[derive(Debug, Default, Clone, Serialize, Deserialize)]
192pub struct UploadPartMeta {
193    pub part_number: u32,
194    pub etag: String,
195    pub size: u64,
196    #[serde(default = "default_time")]
197    pub last_modified: DateTime<Utc>,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
201pub enum BucketSubresource {
202    Tags,
203    Lifecycle,
204    Cors,
205    Policy,
206    Notification,
207    Logging,
208    Website,
209    PublicAccessBlock,
210    ObjectLock,
211    Replication,
212    Ownership,
213    Inventory,
214    Encryption,
215    Versioning,
216    Acl,
217    Accelerate,
218    Analytics,
219    IntelligentTiering,
220    Metrics,
221    RequestPayment,
222    Abac,
223    MetadataConfiguration,
224    MetadataTableConfiguration,
225}
226
227pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
228    BucketSubresource::Tags,
229    BucketSubresource::Lifecycle,
230    BucketSubresource::Cors,
231    BucketSubresource::Policy,
232    BucketSubresource::Notification,
233    BucketSubresource::Logging,
234    BucketSubresource::Website,
235    BucketSubresource::PublicAccessBlock,
236    BucketSubresource::ObjectLock,
237    BucketSubresource::Replication,
238    BucketSubresource::Ownership,
239    BucketSubresource::Inventory,
240    BucketSubresource::Encryption,
241    BucketSubresource::Versioning,
242    BucketSubresource::Acl,
243    BucketSubresource::Accelerate,
244    BucketSubresource::Analytics,
245    BucketSubresource::IntelligentTiering,
246    BucketSubresource::Metrics,
247    BucketSubresource::RequestPayment,
248    BucketSubresource::Abac,
249    BucketSubresource::MetadataConfiguration,
250    BucketSubresource::MetadataTableConfiguration,
251];
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub enum BodyRef {
255    #[serde(skip)]
256    Memory(Bytes),
257    Disk {
258        bucket: String,
259        key: String,
260        #[serde(default)]
261        version: Option<String>,
262        path: PathBuf,
263        size: u64,
264    },
265}
266
267impl BodyRef {
268    pub fn size(&self) -> u64 {
269        match self {
270            BodyRef::Memory(b) => b.len() as u64,
271            BodyRef::Disk { size, .. } => *size,
272        }
273    }
274}
275
276impl Default for BodyRef {
277    fn default() -> Self {
278        BodyRef::Memory(Bytes::new())
279    }
280}
281
282#[derive(Debug)]
283pub enum BodySource {
284    Bytes(Bytes),
285    /// Existing disk path that should be *moved* into the destination via
286    /// rename (upload-tmp → final) and consumed.
287    File(PathBuf),
288    /// Existing disk path that should be *copied* into the destination and
289    /// left in place. Used by replication so the source object stays
290    /// available while the replica is produced.
291    FileCopy(PathBuf),
292}
293
294/// Materialize a [`BodySource`] into [`Bytes`]. The `File` variant is
295/// consumed (file is unlinked after read); `FileCopy` is left in place.
296/// Memory-mode stores call this to absorb tempfiles produced by the
297/// streaming dispatch path so streaming and non-streaming services share
298/// the same wire-shape regardless of `--storage-mode`.
299fn read_body_source(body: BodySource) -> StoreResult<Bytes> {
300    match body {
301        BodySource::Bytes(b) => Ok(b),
302        BodySource::File(p) => {
303            let bytes = std::fs::read(&p)?;
304            // Best-effort: a leftover tempfile is not a correctness
305            // problem so the read result wins over the unlink result.
306            let _ = std::fs::remove_file(&p);
307            Ok(Bytes::from(bytes))
308        }
309        BodySource::FileCopy(p) => {
310            let bytes = std::fs::read(&p)?;
311            Ok(Bytes::from(bytes))
312        }
313    }
314}
315
316#[derive(Debug, Error)]
317pub enum StoreError {
318    #[error("io error: {0}")]
319    Io(#[from] std::io::Error),
320    #[error("serialization error: {0}")]
321    Serde(String),
322    #[error("not supported by this store")]
323    NotSupported,
324    #[error("{0}")]
325    Other(String),
326}
327
328pub type StoreResult<T> = Result<T, StoreError>;
329
330pub trait S3Store: Send + Sync {
331    fn load(&self) -> StoreResult<S3State>;
332
333    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
334    fn put_bucket_subresource(
335        &self,
336        bucket: &str,
337        kind: BucketSubresource,
338        payload: &str,
339    ) -> StoreResult<()>;
340    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
341    fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
342
343    fn put_object(
344        &self,
345        bucket: &str,
346        key: &str,
347        version: Option<&str>,
348        body: BodySource,
349        meta: &ObjectMeta,
350    ) -> StoreResult<BodyRef>;
351    fn put_object_meta(
352        &self,
353        bucket: &str,
354        key: &str,
355        version: Option<&str>,
356        meta: &ObjectMeta,
357    ) -> StoreResult<()>;
358    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
359    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
360
361    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
362    /// Persist a multipart-upload part body. Returns the [`BodyRef`] the
363    /// service should keep in its in-memory part map: `BodyRef::Disk`
364    /// for disk-backed stores (so subsequent reads stream from the
365    /// `.bin` file instead of holding the part in RAM), or
366    /// `BodyRef::Memory` for memory-mode stores.
367    fn mpu_put_part(
368        &self,
369        bucket: &str,
370        upload_id: &str,
371        part_number: u32,
372        body: BodySource,
373        etag: &str,
374    ) -> StoreResult<BodyRef>;
375    /// Where the streaming dispatch path should spool large request
376    /// bodies to disk before handing them to [`Self::put_object`] /
377    /// [`Self::mpu_put_part`]. Returning `Some` keeps the spool file on
378    /// the same filesystem as the final destination so `rename(2)` is a
379    /// metadata-only move; returning `None` means "use the system temp
380    /// dir, the store will read the file back into RAM and unlink it".
381    fn spool_dir(&self) -> Option<std::path::PathBuf> {
382        None
383    }
384    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
385    fn mpu_complete(
386        &self,
387        bucket: &str,
388        upload_id: &str,
389        final_key: &str,
390        version: Option<&str>,
391        meta: &ObjectMeta,
392    ) -> StoreResult<BodyRef>;
393}
394
395pub struct MemoryS3Store;
396
397impl MemoryS3Store {
398    pub fn new() -> Self {
399        Self
400    }
401}
402
403impl Default for MemoryS3Store {
404    fn default() -> Self {
405        Self::new()
406    }
407}
408
409impl S3Store for MemoryS3Store {
410    fn load(&self) -> StoreResult<S3State> {
411        Ok(S3State::default())
412    }
413
414    fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
415        Ok(())
416    }
417    fn put_bucket_subresource(
418        &self,
419        _bucket: &str,
420        _kind: BucketSubresource,
421        _payload: &str,
422    ) -> StoreResult<()> {
423        Ok(())
424    }
425    fn delete_bucket_subresource(
426        &self,
427        _bucket: &str,
428        _kind: BucketSubresource,
429    ) -> StoreResult<()> {
430        Ok(())
431    }
432    fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
433        Ok(())
434    }
435
436    fn put_object(
437        &self,
438        _bucket: &str,
439        _key: &str,
440        _version: Option<&str>,
441        body: BodySource,
442        _meta: &ObjectMeta,
443    ) -> StoreResult<BodyRef> {
444        Ok(BodyRef::Memory(read_body_source(body)?))
445    }
446    fn put_object_meta(
447        &self,
448        _bucket: &str,
449        _key: &str,
450        _version: Option<&str>,
451        _meta: &ObjectMeta,
452    ) -> StoreResult<()> {
453        Ok(())
454    }
455    fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
456        Ok(())
457    }
458    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
459        match body {
460            BodyRef::Memory(b) => Ok(b.clone()),
461            BodyRef::Disk { .. } => {
462                panic!("MemoryS3Store cannot open Disk-backed BodyRef")
463            }
464        }
465    }
466
467    fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
468        Ok(())
469    }
470    fn mpu_put_part(
471        &self,
472        _bucket: &str,
473        _upload_id: &str,
474        _part_number: u32,
475        body: BodySource,
476        _etag: &str,
477    ) -> StoreResult<BodyRef> {
478        // Memory mode keeps all part bodies in RAM. Absorb any tempfile
479        // produced by the streaming dispatch path here.
480        Ok(BodyRef::Memory(read_body_source(body)?))
481    }
482    fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
483        Ok(())
484    }
485    fn mpu_complete(
486        &self,
487        _bucket: &str,
488        _upload_id: &str,
489        _final_key: &str,
490        _version: Option<&str>,
491        _meta: &ObjectMeta,
492    ) -> StoreResult<BodyRef> {
493        Ok(BodyRef::Memory(Bytes::new()))
494    }
495}
496
497pub struct DiskS3Store {
498    root: PathBuf,
499    cache: std::sync::Arc<crate::cache::BodyCache>,
500}
501
502impl DiskS3Store {
503    pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
504        Self { root, cache }
505    }
506
507    fn buckets_dir(&self) -> PathBuf {
508        self.root.join("buckets")
509    }
510
511    fn bucket_dir(&self, bucket: &str) -> PathBuf {
512        self.buckets_dir()
513            .join(crate::key_escape::escape_key_segment(bucket))
514    }
515
516    fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
517        self.bucket_dir(bucket)
518            .join("objects")
519            .join(crate::key_escape::escape_key_segment(key))
520    }
521
522    fn version_tag(version: Option<&str>) -> String {
523        version.unwrap_or("null").to_string()
524    }
525
526    fn object_paths(
527        &self,
528        bucket: &str,
529        key: &str,
530        version: Option<&str>,
531    ) -> (PathBuf, PathBuf, PathBuf) {
532        let dir = self.object_dir(bucket, key);
533        let tag = Self::version_tag(version);
534        let bin = dir.join(format!("{}.bin", tag));
535        let toml = dir.join(format!("{}.toml", tag));
536        (dir, bin, toml)
537    }
538
539    fn subresource_filename(kind: BucketSubresource) -> &'static str {
540        match kind {
541            BucketSubresource::Tags => "tags.toml",
542            BucketSubresource::Lifecycle => "lifecycle.toml",
543            BucketSubresource::Cors => "cors.toml",
544            BucketSubresource::Policy => "policy.toml",
545            BucketSubresource::Notification => "notification.toml",
546            BucketSubresource::Logging => "logging.toml",
547            BucketSubresource::Website => "website.toml",
548            BucketSubresource::PublicAccessBlock => "public_access_block.toml",
549            BucketSubresource::ObjectLock => "object_lock.toml",
550            BucketSubresource::Replication => "replication.toml",
551            BucketSubresource::Ownership => "ownership.toml",
552            BucketSubresource::Inventory => "inventory.toml",
553            BucketSubresource::Encryption => "encryption.toml",
554            BucketSubresource::Versioning => "versioning.toml",
555            BucketSubresource::Acl => "acl.toml",
556            BucketSubresource::Accelerate => "accelerate.toml",
557            BucketSubresource::Analytics => "analytics.toml",
558            BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
559            BucketSubresource::Metrics => "metrics.toml",
560            BucketSubresource::RequestPayment => "request_payment.toml",
561            BucketSubresource::Abac => "abac.toml",
562            BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
563            BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
564        }
565    }
566
567    fn cleanup_empty(dir: &std::path::Path) {
568        let _ = std::fs::remove_dir(dir);
569    }
570
571    fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
572        self.bucket_dir(bucket)
573            .join("mpu")
574            .join(crate::key_escape::escape_key_segment(upload_id))
575    }
576
577    fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
578        self.mpu_dir(bucket, upload_id).join("parts")
579    }
580
581    fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
582        self.mpu_parts_dir(bucket, upload_id)
583            .join(format!("{}.bin", part_number))
584    }
585
586    fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
587        self.mpu_parts_dir(bucket, upload_id)
588            .join(format!("{}.toml", part_number))
589    }
590
591    fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
592        crate::cache::BodyKey::new(
593            bucket.to_string(),
594            format!("__mpu__/{}", upload_id),
595            Some(format!("part-{}", part_number)),
596        )
597    }
598}
599
600fn io_other(msg: impl Into<String>) -> StoreError {
601    StoreError::Other(msg.into())
602}
603
604impl S3Store for DiskS3Store {
605    fn load(&self) -> StoreResult<S3State> {
606        let mut state = S3State::default();
607        let buckets_dir = self.buckets_dir();
608        if !buckets_dir.exists() {
609            return Ok(state);
610        }
611        for entry in std::fs::read_dir(&buckets_dir)? {
612            let entry = entry?;
613            if !entry.file_type()?.is_dir() {
614                continue;
615            }
616            let bdir = entry.path();
617            let meta_path = bdir.join("meta.toml");
618            if !meta_path.exists() {
619                continue;
620            }
621            let meta_text = std::fs::read_to_string(&meta_path)?;
622            let mut meta: BucketMeta =
623                toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
624            let mut snap = BucketSnapshot {
625                meta: meta.clone(),
626                objects: BTreeMap::new(),
627                object_versions: BTreeMap::new(),
628                subresources: BTreeMap::new(),
629                multipart_uploads: BTreeMap::new(),
630            };
631
632            for kind in ALL_SUBRESOURCES {
633                let fname = Self::subresource_filename(*kind);
634                let path = bdir.join(fname);
635                if path.exists() {
636                    let text = std::fs::read_to_string(&path)?;
637                    if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none() {
638                        let stripped = text.trim();
639                        if !stripped.is_empty() {
640                            snap.meta.versioning = Some(stripped.to_string());
641                            meta.versioning = snap.meta.versioning.clone();
642                        }
643                    }
644                    snap.subresources.insert(fname.to_string(), text);
645                }
646            }
647
648            let objects_root = bdir.join("objects");
649            if objects_root.exists() {
650                for okey_entry in std::fs::read_dir(&objects_root)? {
651                    let okey_entry = okey_entry?;
652                    if !okey_entry.file_type()?.is_dir() {
653                        continue;
654                    }
655                    let key_dir = okey_entry.path();
656                    let mut versioned: Vec<LoadedObject> = Vec::new();
657                    let mut key_name: Option<String> = None;
658                    for version_entry in std::fs::read_dir(&key_dir)? {
659                        let version_entry = version_entry?;
660                        let path = version_entry.path();
661                        let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
662                            continue;
663                        };
664                        if !fname.ends_with(".toml") {
665                            continue;
666                        }
667                        let version_tag = &fname[..fname.len() - 5];
668                        let toml_text = std::fs::read_to_string(&path)?;
669                        let obj_meta: ObjectMeta = toml::from_str(&toml_text)
670                            .map_err(|e| StoreError::Serde(e.to_string()))?;
671                        let bin_path = key_dir.join(format!("{}.bin", version_tag));
672                        let (body, size) = if obj_meta.is_delete_marker {
673                            (BodyRef::Memory(Bytes::new()), 0u64)
674                        } else if bin_path.exists() {
675                            let sz = std::fs::metadata(&bin_path)?.len();
676                            (
677                                BodyRef::Disk {
678                                    bucket: meta.name.clone(),
679                                    key: obj_meta.key.clone(),
680                                    version: if version_tag == "null" {
681                                        None
682                                    } else {
683                                        Some(version_tag.to_string())
684                                    },
685                                    path: bin_path,
686                                    size: sz,
687                                },
688                                sz,
689                            )
690                        } else {
691                            // Fail loud: the sidecar says this object has a
692                            // body but the .bin file is missing. Returning
693                            // silently would hand the caller a truncated
694                            // object and hide data loss.
695                            return Err(StoreError::Other(format!(
696                                "missing body file: {}",
697                                bin_path.display()
698                            )));
699                        };
700                        let _ = size;
701                        key_name.get_or_insert_with(|| obj_meta.key.clone());
702                        if version_tag == "null" && obj_meta.version_id.is_none() {
703                            snap.objects.insert(
704                                obj_meta.key.clone(),
705                                LoadedObject {
706                                    meta: obj_meta,
707                                    body,
708                                },
709                            );
710                        } else {
711                            versioned.push(LoadedObject {
712                                meta: obj_meta,
713                                body,
714                            });
715                        }
716                    }
717                    if !versioned.is_empty() {
718                        versioned.sort_by_key(|v| v.meta.last_modified);
719                        if let Some(key) = key_name {
720                            // Reconcile snap.objects with the newest version:
721                            // a trailing delete marker hides any prior null or
722                            // live version (remove it); otherwise overwrite
723                            // with the newest live version, even if a
724                            // pre-versioning null.toml had already been
725                            // inserted during the non-versioned scan.
726                            match versioned.last() {
727                                Some(newest) if newest.meta.is_delete_marker => {
728                                    snap.objects.remove(&key);
729                                }
730                                Some(newest) => {
731                                    snap.objects.insert(key.clone(), newest.clone());
732                                }
733                                None => {}
734                            }
735                            snap.object_versions.insert(key, versioned);
736                        }
737                    }
738                }
739            }
740
741            let mpu_root = bdir.join("mpu");
742            if mpu_root.exists() {
743                for upload_entry in std::fs::read_dir(&mpu_root)? {
744                    let upload_entry = upload_entry?;
745                    if !upload_entry.file_type()?.is_dir() {
746                        continue;
747                    }
748                    let upload_dir = upload_entry.path();
749                    let init_path = upload_dir.join("init.toml");
750                    if !init_path.exists() {
751                        continue;
752                    }
753                    let init_text = std::fs::read_to_string(&init_path)?;
754                    let init: MpuInit =
755                        toml::from_str(&init_text).map_err(|e| StoreError::Serde(e.to_string()))?;
756                    let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
757                    let parts_dir = upload_dir.join("parts");
758                    if parts_dir.exists() {
759                        for part_entry in std::fs::read_dir(&parts_dir)? {
760                            let part_entry = part_entry?;
761                            let path = part_entry.path();
762                            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
763                                continue;
764                            };
765                            if !fname.ends_with(".toml") {
766                                continue;
767                            }
768                            let stem = &fname[..fname.len() - 5];
769                            let Ok(part_number) = stem.parse::<u32>() else {
770                                continue;
771                            };
772                            let toml_text = std::fs::read_to_string(&path)?;
773                            let part_meta: UploadPartMeta = toml::from_str(&toml_text)
774                                .map_err(|e| StoreError::Serde(e.to_string()))?;
775                            let bin_path = parts_dir.join(format!("{}.bin", part_number));
776                            if !bin_path.exists() {
777                                return Err(StoreError::Other(format!(
778                                    "missing multipart part body file: {}",
779                                    bin_path.display()
780                                )));
781                            }
782                            let sz = std::fs::metadata(&bin_path)?.len();
783                            let body = BodyRef::Disk {
784                                bucket: meta.name.clone(),
785                                key: format!("__mpu__/{}", init.upload_id),
786                                version: Some(format!("part-{}", part_number)),
787                                path: bin_path,
788                                size: sz,
789                            };
790                            loaded_parts.insert(
791                                part_number,
792                                LoadedPart {
793                                    meta: part_meta,
794                                    body,
795                                },
796                            );
797                        }
798                    }
799                    snap.multipart_uploads.insert(
800                        init.upload_id.clone(),
801                        LoadedMpu {
802                            init,
803                            parts: loaded_parts,
804                        },
805                    );
806                }
807            }
808
809            state.buckets.insert(meta.name.clone(), snap);
810        }
811        Ok(state)
812    }
813
814    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
815        let dir = self.bucket_dir(bucket);
816        std::fs::create_dir_all(&dir)?;
817        crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
818        Ok(())
819    }
820
821    fn put_bucket_subresource(
822        &self,
823        bucket: &str,
824        kind: BucketSubresource,
825        payload: &str,
826    ) -> StoreResult<()> {
827        let dir = self.bucket_dir(bucket);
828        std::fs::create_dir_all(&dir)?;
829        let path = dir.join(Self::subresource_filename(kind));
830        crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
831        Ok(())
832    }
833
834    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
835        let path = self
836            .bucket_dir(bucket)
837            .join(Self::subresource_filename(kind));
838        match std::fs::remove_file(&path) {
839            Ok(_) => Ok(()),
840            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
841            Err(e) => Err(e.into()),
842        }
843    }
844
845    fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
846        let dir = self.bucket_dir(bucket);
847        match std::fs::remove_dir_all(&dir) {
848            Ok(_) => Ok(()),
849            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
850            Err(e) => Err(e.into()),
851        }
852    }
853
854    fn put_object(
855        &self,
856        bucket: &str,
857        key: &str,
858        version: Option<&str>,
859        body: BodySource,
860        meta: &ObjectMeta,
861    ) -> StoreResult<BodyRef> {
862        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
863        std::fs::create_dir_all(&dir)?;
864
865        if meta.is_delete_marker {
866            crate::atomic::write_atomic_toml(&toml_path, meta)?;
867            return Ok(BodyRef::Memory(Bytes::new()));
868        }
869
870        let size: u64;
871        let bytes_for_cache: Option<Bytes>;
872        match body {
873            BodySource::Bytes(b) => {
874                size = b.len() as u64;
875                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
876                bytes_for_cache = Some(b);
877            }
878            BodySource::File(src) => {
879                let src_size = std::fs::metadata(&src)?.len();
880                size = src_size;
881                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
882                bytes_for_cache = None;
883            }
884            BodySource::FileCopy(src) => {
885                let src_size = std::fs::metadata(&src)?.len();
886                size = src_size;
887                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
888                bytes_for_cache = None;
889            }
890        }
891
892        crate::atomic::write_atomic_toml(&toml_path, meta)?;
893
894        let body_key = crate::cache::BodyKey::new(
895            bucket.to_string(),
896            key.to_string(),
897            version.map(|s| s.to_string()),
898        );
899        if let Some(b) = bytes_for_cache {
900            self.cache.insert(body_key, b);
901        } else {
902            self.cache.invalidate(&crate::cache::BodyKey::new(
903                bucket.to_string(),
904                key.to_string(),
905                version.map(|s| s.to_string()),
906            ));
907        }
908
909        Ok(BodyRef::Disk {
910            bucket: bucket.to_string(),
911            key: key.to_string(),
912            version: version.map(|s| s.to_string()),
913            path: bin_path,
914            size,
915        })
916    }
917
918    fn put_object_meta(
919        &self,
920        bucket: &str,
921        key: &str,
922        version: Option<&str>,
923        meta: &ObjectMeta,
924    ) -> StoreResult<()> {
925        let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
926        std::fs::create_dir_all(&dir)?;
927        crate::atomic::write_atomic_toml(&toml_path, meta)?;
928        Ok(())
929    }
930
931    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
932        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
933        for p in [&bin_path, &toml_path] {
934            match std::fs::remove_file(p) {
935                Ok(_) => {}
936                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
937                Err(e) => return Err(e.into()),
938            }
939        }
940        Self::cleanup_empty(&dir);
941
942        self.cache.invalidate(&crate::cache::BodyKey::new(
943            bucket.to_string(),
944            key.to_string(),
945            version.map(|s| s.to_string()),
946        ));
947        Ok(())
948    }
949
950    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
951        match body {
952            BodyRef::Memory(b) => Ok(b.clone()),
953            BodyRef::Disk {
954                bucket,
955                key,
956                version,
957                path,
958                size: _,
959            } => {
960                let body_key =
961                    crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
962                if let Some(bytes) = self.cache.get(&body_key) {
963                    return Ok(bytes);
964                }
965                let bytes = Bytes::from(std::fs::read(path)?);
966                self.cache.insert(body_key, bytes.clone());
967                Ok(bytes)
968            }
969        }
970    }
971
972    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
973        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
974        std::fs::create_dir_all(&parts_dir)?;
975        let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
976        crate::atomic::write_atomic_toml(&init_path, init)?;
977        Ok(())
978    }
979
980    fn mpu_put_part(
981        &self,
982        bucket: &str,
983        upload_id: &str,
984        part_number: u32,
985        body: BodySource,
986        etag: &str,
987    ) -> StoreResult<BodyRef> {
988        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
989        std::fs::create_dir_all(&parts_dir)?;
990        let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
991        let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
992
993        let size: u64 = match body {
994            BodySource::Bytes(b) => {
995                let n = b.len() as u64;
996                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
997                let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
998                self.cache.insert(cache_key, b);
999                n
1000            }
1001            BodySource::File(src) => {
1002                let n = std::fs::metadata(&src)?.len();
1003                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
1004                self.cache
1005                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1006                n
1007            }
1008            BodySource::FileCopy(src) => {
1009                let n = std::fs::metadata(&src)?.len();
1010                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
1011                self.cache
1012                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1013                n
1014            }
1015        };
1016
1017        let meta = UploadPartMeta {
1018            part_number,
1019            etag: etag.to_string(),
1020            size,
1021            last_modified: Utc::now(),
1022        };
1023        crate::atomic::write_atomic_toml(&toml_path, &meta)?;
1024        // Disk-mode parts live on disk; expose them as BodyRef::Disk so
1025        // the runtime state never holds the part body in RAM.
1026        Ok(BodyRef::Disk {
1027            bucket: bucket.to_string(),
1028            key: format!("__mpu__/{upload_id}/part-{part_number:05}"),
1029            version: None,
1030            path: bin_path,
1031            size,
1032        })
1033    }
1034
1035    fn spool_dir(&self) -> Option<std::path::PathBuf> {
1036        let dir = self.root.join(".spool");
1037        // Best-effort: create lazily; the spool helper also creates if
1038        // missing. Returning the path even when create fails is safe —
1039        // the helper handles both create and the actual file open.
1040        let _ = std::fs::create_dir_all(&dir);
1041        Some(dir)
1042    }
1043
1044    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
1045        let dir = self.mpu_dir(bucket, upload_id);
1046        // Invalidate any cached part bodies for this upload.
1047        if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
1048            for entry in entries.flatten() {
1049                let path = entry.path();
1050                if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
1051                    if let Some(stem) = fname.strip_suffix(".bin") {
1052                        if let Ok(n) = stem.parse::<u32>() {
1053                            self.cache
1054                                .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1055                        }
1056                    }
1057                }
1058            }
1059        }
1060        match std::fs::remove_dir_all(&dir) {
1061            Ok(_) => Ok(()),
1062            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1063            Err(e) => Err(e.into()),
1064        }
1065    }
1066
1067    fn mpu_complete(
1068        &self,
1069        bucket: &str,
1070        upload_id: &str,
1071        final_key: &str,
1072        version: Option<&str>,
1073        meta: &ObjectMeta,
1074    ) -> StoreResult<BodyRef> {
1075        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1076        if !parts_dir.exists() {
1077            return Err(io_other(format!(
1078                "mpu_complete: no parts dir for upload {}",
1079                upload_id
1080            )));
1081        }
1082
1083        // Enumerate parts in ascending part-number order.
1084        let mut part_numbers: Vec<u32> = Vec::new();
1085        for entry in std::fs::read_dir(&parts_dir)? {
1086            let entry = entry?;
1087            let path = entry.path();
1088            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1089                continue;
1090            };
1091            if let Some(stem) = fname.strip_suffix(".toml") {
1092                if let Ok(n) = stem.parse::<u32>() {
1093                    if self.mpu_part_bin(bucket, upload_id, n).exists() {
1094                        part_numbers.push(n);
1095                    }
1096                }
1097            }
1098        }
1099        part_numbers.sort_unstable();
1100        if part_numbers.is_empty() {
1101            return Err(io_other(format!(
1102                "mpu_complete: upload {} has no parts",
1103                upload_id
1104            )));
1105        }
1106
1107        let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1108        std::fs::create_dir_all(&dir)?;
1109
1110        let total_size: u64 = if part_numbers.len() == 1 {
1111            let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1112            let sz = std::fs::metadata(&only)?.len();
1113            match std::fs::rename(&only, &bin_path) {
1114                Ok(_) => {}
1115                Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1116                    // Cross-device rename: fall back to streaming copy then remove source.
1117                    {
1118                        let mut input = std::fs::File::open(&only)?;
1119                        let tmp = {
1120                            let mut os = bin_path.as_os_str().to_owned();
1121                            os.push(".tmp");
1122                            PathBuf::from(os)
1123                        };
1124                        let mut out = std::fs::OpenOptions::new()
1125                            .write(true)
1126                            .create(true)
1127                            .truncate(true)
1128                            .open(&tmp)?;
1129                        std::io::copy(&mut input, &mut out)?;
1130                        out.sync_all()?;
1131                        std::fs::rename(&tmp, &bin_path)?;
1132                    }
1133                    let _ = std::fs::remove_file(&only);
1134                }
1135                Err(e) => return Err(e.into()),
1136            }
1137            sz
1138        } else {
1139            let tmp = {
1140                let mut os = bin_path.as_os_str().to_owned();
1141                os.push(".tmp");
1142                PathBuf::from(os)
1143            };
1144            let mut total: u64 = 0;
1145            {
1146                let mut out = std::fs::OpenOptions::new()
1147                    .write(true)
1148                    .create(true)
1149                    .truncate(true)
1150                    .open(&tmp)?;
1151                for n in &part_numbers {
1152                    let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1153                    let mut input = std::fs::File::open(&part_path)?;
1154                    let copied = std::io::copy(&mut input, &mut out)?;
1155                    total += copied;
1156                }
1157                out.sync_all()?;
1158            }
1159            std::fs::rename(&tmp, &bin_path)?;
1160            if let Some(parent) = bin_path.parent() {
1161                if let Ok(dir_handle) = std::fs::File::open(parent) {
1162                    let _ = dir_handle.sync_all();
1163                }
1164            }
1165            total
1166        };
1167
1168        crate::atomic::write_atomic_toml(&toml_path, meta)?;
1169
1170        // Invalidate per-part cache entries and drop the mpu dir.
1171        for n in &part_numbers {
1172            self.cache
1173                .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1174        }
1175        let mpu_dir = self.mpu_dir(bucket, upload_id);
1176        let _ = std::fs::remove_dir_all(&mpu_dir);
1177
1178        // The concatenated body is deliberately NOT re-inserted into the cache —
1179        // large multipart uploads typically exceed the single-object cap, and we
1180        // must never round-trip the assembled body through RAM. The next
1181        // open_object_body call will load through the normal cache path.
1182        self.cache.invalidate(&crate::cache::BodyKey::new(
1183            bucket.to_string(),
1184            final_key.to_string(),
1185            version.map(|s| s.to_string()),
1186        ));
1187
1188        Ok(BodyRef::Disk {
1189            bucket: bucket.to_string(),
1190            key: final_key.to_string(),
1191            version: version.map(|s| s.to_string()),
1192            path: bin_path,
1193            size: total_size,
1194        })
1195    }
1196}
1197
1198fn libc_exdev() -> i32 {
1199    18
1200}
1201
1202#[cfg(test)]
1203mod disk_tests {
1204    use super::*;
1205    use std::sync::Arc;
1206    use tempfile::TempDir;
1207
1208    fn new_store(tmp: &TempDir) -> DiskS3Store {
1209        let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1210        DiskS3Store::new(tmp.path().to_path_buf(), cache)
1211    }
1212
1213    fn new_store_with_cache(
1214        tmp: &TempDir,
1215        cap: u64,
1216    ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1217        let cache = Arc::new(crate::cache::BodyCache::new(cap));
1218        (
1219            DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1220            cache,
1221        )
1222    }
1223
1224    fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1225        ObjectMeta {
1226            key: key.to_string(),
1227            content_type: "application/octet-stream".to_string(),
1228            etag: "etag".to_string(),
1229            size,
1230            ..Default::default()
1231        }
1232    }
1233
1234    #[test]
1235    fn put_bucket_meta_roundtrip() {
1236        let tmp = TempDir::new().unwrap();
1237        let store = new_store(&tmp);
1238        let meta = BucketMeta {
1239            name: "b1".to_string(),
1240            region: "us-east-1".to_string(),
1241            versioning: Some("Enabled".to_string()),
1242            ..Default::default()
1243        };
1244        store.put_bucket_meta("b1", &meta).unwrap();
1245        let loaded = store.load().unwrap();
1246        let snap = loaded.buckets.get("b1").unwrap();
1247        assert_eq!(snap.meta.name, "b1");
1248        assert_eq!(snap.meta.region, "us-east-1");
1249        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1250    }
1251
1252    #[test]
1253    fn put_bucket_subresource_each_variant_writes_file() {
1254        let tmp = TempDir::new().unwrap();
1255        let store = new_store(&tmp);
1256        store
1257            .put_bucket_meta(
1258                "b",
1259                &BucketMeta {
1260                    name: "b".to_string(),
1261                    ..Default::default()
1262                },
1263            )
1264            .unwrap();
1265        let variants = [
1266            BucketSubresource::Tags,
1267            BucketSubresource::Lifecycle,
1268            BucketSubresource::Cors,
1269            BucketSubresource::Policy,
1270            BucketSubresource::Notification,
1271            BucketSubresource::Logging,
1272            BucketSubresource::Website,
1273            BucketSubresource::PublicAccessBlock,
1274            BucketSubresource::ObjectLock,
1275            BucketSubresource::Replication,
1276            BucketSubresource::Ownership,
1277            BucketSubresource::Inventory,
1278            BucketSubresource::Encryption,
1279            BucketSubresource::Versioning,
1280            BucketSubresource::Acl,
1281            BucketSubresource::Accelerate,
1282        ];
1283        for v in variants {
1284            store
1285                .put_bucket_subresource("b", v, "payload=true")
1286                .unwrap();
1287            let file = store
1288                .bucket_dir("b")
1289                .join(DiskS3Store::subresource_filename(v));
1290            assert!(file.exists(), "{:?}", v);
1291            assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1292        }
1293    }
1294
1295    #[test]
1296    fn delete_bucket_subresource_removes_file() {
1297        let tmp = TempDir::new().unwrap();
1298        let store = new_store(&tmp);
1299        store
1300            .put_bucket_meta(
1301                "b",
1302                &BucketMeta {
1303                    name: "b".to_string(),
1304                    ..Default::default()
1305                },
1306            )
1307            .unwrap();
1308        store
1309            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1310            .unwrap();
1311        store
1312            .delete_bucket_subresource("b", BucketSubresource::Tags)
1313            .unwrap();
1314        let file = store.bucket_dir("b").join("tags.toml");
1315        assert!(!file.exists());
1316        // idempotent
1317        store
1318            .delete_bucket_subresource("b", BucketSubresource::Tags)
1319            .unwrap();
1320    }
1321
1322    #[test]
1323    fn put_object_bytes_roundtrip() {
1324        let tmp = TempDir::new().unwrap();
1325        let store = new_store(&tmp);
1326        store
1327            .put_bucket_meta(
1328                "b",
1329                &BucketMeta {
1330                    name: "b".to_string(),
1331                    ..Default::default()
1332                },
1333            )
1334            .unwrap();
1335        let data = Bytes::from_static(b"hello world");
1336        let meta = sample_meta("k1", data.len() as u64);
1337        let body_ref = store
1338            .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1339            .unwrap();
1340        match &body_ref {
1341            BodyRef::Disk {
1342                bucket,
1343                key,
1344                size,
1345                path,
1346                ..
1347            } => {
1348                assert_eq!(bucket, "b");
1349                assert_eq!(key, "k1");
1350                assert_eq!(*size, data.len() as u64);
1351                assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1352            }
1353            _ => panic!("expected Disk"),
1354        }
1355        let loaded = store.load().unwrap();
1356        let snap = loaded.buckets.get("b").unwrap();
1357        let obj = snap.objects.get("k1").unwrap();
1358        assert_eq!(obj.meta.size, data.len() as u64);
1359    }
1360
1361    #[test]
1362    fn put_object_file_copy_source_leaves_src_in_place() {
1363        let tmp = TempDir::new().unwrap();
1364        let store = new_store(&tmp);
1365        store
1366            .put_bucket_meta(
1367                "b",
1368                &BucketMeta {
1369                    name: "b".to_string(),
1370                    ..Default::default()
1371                },
1372            )
1373            .unwrap();
1374        let src_dir = TempDir::new().unwrap();
1375        let src = src_dir.path().join("src.bin");
1376        std::fs::write(&src, b"copied-body").unwrap();
1377        let meta = sample_meta("k", 11);
1378        let bref = store
1379            .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1380            .unwrap();
1381        match bref {
1382            BodyRef::Disk { path, size, .. } => {
1383                assert_eq!(size, 11);
1384                assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1385            }
1386            _ => panic!("expected Disk bodyref"),
1387        }
1388        assert!(src.exists(), "source file must not be moved by FileCopy");
1389        assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1390    }
1391
1392    #[test]
1393    fn put_object_file_source() {
1394        let tmp = TempDir::new().unwrap();
1395        let store = new_store(&tmp);
1396        store
1397            .put_bucket_meta(
1398                "b",
1399                &BucketMeta {
1400                    name: "b".to_string(),
1401                    ..Default::default()
1402                },
1403            )
1404            .unwrap();
1405        let src = tmp.path().join("src.bin");
1406        std::fs::write(&src, b"file-body").unwrap();
1407        let meta = sample_meta("k", 9);
1408        let body_ref = store
1409            .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1410            .unwrap();
1411        let path = match body_ref {
1412            BodyRef::Disk { path, .. } => path,
1413            _ => panic!(),
1414        };
1415        assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1416    }
1417
1418    #[test]
1419    fn put_object_meta_only_keeps_bin() {
1420        let tmp = TempDir::new().unwrap();
1421        let store = new_store(&tmp);
1422        store
1423            .put_bucket_meta(
1424                "b",
1425                &BucketMeta {
1426                    name: "b".to_string(),
1427                    ..Default::default()
1428                },
1429            )
1430            .unwrap();
1431        let data = Bytes::from_static(b"abc");
1432        let mut meta = sample_meta("k", 3);
1433        store
1434            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1435            .unwrap();
1436        let (_, bin, _) = store.object_paths("b", "k", None);
1437        let before = std::fs::read(&bin).unwrap();
1438        meta.tags.insert("x".to_string(), "y".to_string());
1439        store.put_object_meta("b", "k", None, &meta).unwrap();
1440        assert_eq!(std::fs::read(&bin).unwrap(), before);
1441        let loaded = store.load().unwrap();
1442        let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1443        assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1444    }
1445
1446    #[test]
1447    fn delete_object_cleans_up_files_and_cache() {
1448        let tmp = TempDir::new().unwrap();
1449        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1450        store
1451            .put_bucket_meta(
1452                "b",
1453                &BucketMeta {
1454                    name: "b".to_string(),
1455                    ..Default::default()
1456                },
1457            )
1458            .unwrap();
1459        let data = Bytes::from_static(b"bye");
1460        let meta = sample_meta("k", 3);
1461        store
1462            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1463            .unwrap();
1464        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1465        assert!(cache.get(&body_key).is_some());
1466        store.delete_object("b", "k", None).unwrap();
1467        let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1468        assert!(!bin.exists());
1469        assert!(!toml_path.exists());
1470        assert!(!dir.exists());
1471        assert!(cache.get(&body_key).is_none());
1472    }
1473
1474    #[test]
1475    fn open_object_body_cache_hit_and_refill() {
1476        let tmp = TempDir::new().unwrap();
1477        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1478        store
1479            .put_bucket_meta(
1480                "b",
1481                &BucketMeta {
1482                    name: "b".to_string(),
1483                    ..Default::default()
1484                },
1485            )
1486            .unwrap();
1487        let data = Bytes::from_static(b"payload");
1488        let meta = sample_meta("k", data.len() as u64);
1489        let body_ref = store
1490            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1491            .unwrap();
1492        // Cache hit.
1493        let got = store.open_object_body(&body_ref).unwrap();
1494        assert_eq!(got, data);
1495        // Invalidate and re-read populates cache from disk.
1496        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1497        cache.invalidate(&body_key);
1498        assert!(cache.get(&body_key).is_none());
1499        let got = store.open_object_body(&body_ref).unwrap();
1500        assert_eq!(got, data);
1501        assert!(cache.get(&body_key).is_some());
1502    }
1503
1504    #[test]
1505    fn open_object_body_large_bypasses_cache() {
1506        let tmp = TempDir::new().unwrap();
1507        // capacity 1024 → single-object cap 512. Use 800-byte body.
1508        let (store, cache) = new_store_with_cache(&tmp, 1024);
1509        store
1510            .put_bucket_meta(
1511                "b",
1512                &BucketMeta {
1513                    name: "b".to_string(),
1514                    ..Default::default()
1515                },
1516            )
1517            .unwrap();
1518        let data = Bytes::from(vec![7u8; 800]);
1519        let meta = sample_meta("big", 800);
1520        let body_ref = store
1521            .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1522            .unwrap();
1523        let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1524        assert!(cache.get(&body_key).is_none());
1525        let got = store.open_object_body(&body_ref).unwrap();
1526        assert_eq!(got, data);
1527        // Still none — exceeds single-object cap.
1528        assert!(cache.get(&body_key).is_none());
1529    }
1530
1531    #[test]
1532    fn load_empty_dir() {
1533        let tmp = TempDir::new().unwrap();
1534        let store = new_store(&tmp);
1535        let s = store.load().unwrap();
1536        assert!(s.buckets.is_empty());
1537    }
1538
1539    #[test]
1540    fn load_skips_mpu_without_init() {
1541        let tmp = TempDir::new().unwrap();
1542        let store = new_store(&tmp);
1543        store
1544            .put_bucket_meta(
1545                "b",
1546                &BucketMeta {
1547                    name: "b".to_string(),
1548                    ..Default::default()
1549                },
1550            )
1551            .unwrap();
1552        let data = Bytes::from_static(b"x");
1553        let meta = sample_meta("k", 1);
1554        store
1555            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1556            .unwrap();
1557        // Directory with no init.toml is skipped by load.
1558        let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1559        std::fs::create_dir_all(&mpu).unwrap();
1560
1561        let loaded = store.load().unwrap();
1562        let snap = loaded.buckets.get("b").unwrap();
1563        assert_eq!(snap.objects.len(), 1);
1564        assert!(snap.multipart_uploads.is_empty());
1565    }
1566
1567    #[test]
1568    fn load_reads_bucket_subresources() {
1569        let tmp = TempDir::new().unwrap();
1570        let store = new_store(&tmp);
1571        store
1572            .put_bucket_meta(
1573                "b",
1574                &BucketMeta {
1575                    name: "b".to_string(),
1576                    ..Default::default()
1577                },
1578            )
1579            .unwrap();
1580        store
1581            .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1582            .unwrap();
1583        store
1584            .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1585            .unwrap();
1586        store
1587            .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1588            .unwrap();
1589        store
1590            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1591            .unwrap();
1592        let loaded = store.load().unwrap();
1593        let snap = loaded.buckets.get("b").unwrap();
1594        assert_eq!(
1595            snap.subresources.get("lifecycle.toml").map(String::as_str),
1596            Some("<Lifecycle/>"),
1597        );
1598        assert_eq!(
1599            snap.subresources.get("cors.toml").map(String::as_str),
1600            Some("<Cors/>"),
1601        );
1602        assert_eq!(
1603            snap.subresources.get("policy.toml").map(String::as_str),
1604            Some("{}"),
1605        );
1606        assert_eq!(
1607            snap.subresources.get("tags.toml").map(String::as_str),
1608            Some("x=1"),
1609        );
1610    }
1611
1612    #[test]
1613    fn versioned_put_load_roundtrip() {
1614        let tmp = TempDir::new().unwrap();
1615        let store = new_store(&tmp);
1616        store
1617            .put_bucket_meta(
1618                "b",
1619                &BucketMeta {
1620                    name: "b".to_string(),
1621                    versioning: Some("Enabled".to_string()),
1622                    ..Default::default()
1623                },
1624            )
1625            .unwrap();
1626        let base = chrono::Utc::now();
1627        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1628            .iter()
1629            .enumerate()
1630        {
1631            let mut m = sample_meta("k", body.len() as u64);
1632            m.version_id = Some((*vid).to_string());
1633            m.last_modified = base + chrono::Duration::seconds(i as i64);
1634            store
1635                .put_object(
1636                    "b",
1637                    "k",
1638                    Some(*vid),
1639                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1640                    &m,
1641                )
1642                .unwrap();
1643        }
1644        let loaded = store.load().unwrap();
1645        let snap = loaded.buckets.get("b").unwrap();
1646        let versions = snap.object_versions.get("k").expect("versions present");
1647        assert_eq!(versions.len(), 3);
1648        assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1649        assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1650        assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1651        for v in versions {
1652            match &v.body {
1653                BodyRef::Disk { size, .. } => assert!(*size > 0),
1654                _ => panic!("expected Disk body"),
1655            }
1656        }
1657    }
1658
1659    #[test]
1660    fn versioned_load_promotes_latest_live_to_snap_objects() {
1661        let tmp = TempDir::new().unwrap();
1662        let store = new_store(&tmp);
1663        store
1664            .put_bucket_meta(
1665                "b",
1666                &BucketMeta {
1667                    name: "b".to_string(),
1668                    versioning: Some("Enabled".to_string()),
1669                    ..Default::default()
1670                },
1671            )
1672            .unwrap();
1673        let base = chrono::Utc::now();
1674        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1675            .iter()
1676            .enumerate()
1677        {
1678            let mut m = sample_meta("k", body.len() as u64);
1679            m.version_id = Some((*vid).to_string());
1680            m.last_modified = base + chrono::Duration::seconds(i as i64);
1681            store
1682                .put_object(
1683                    "b",
1684                    "k",
1685                    Some(*vid),
1686                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1687                    &m,
1688                )
1689                .unwrap();
1690        }
1691        let loaded = store.load().unwrap();
1692        let snap = loaded.buckets.get("b").unwrap();
1693        let current = snap.objects.get("k").expect("current object promoted");
1694        assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1695    }
1696
1697    #[test]
1698    fn versioned_load_trailing_delete_marker_hides_current() {
1699        let tmp = TempDir::new().unwrap();
1700        let store = new_store(&tmp);
1701        store
1702            .put_bucket_meta(
1703                "b",
1704                &BucketMeta {
1705                    name: "b".to_string(),
1706                    versioning: Some("Enabled".to_string()),
1707                    ..Default::default()
1708                },
1709            )
1710            .unwrap();
1711        let base = chrono::Utc::now();
1712        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1713            .iter()
1714            .enumerate()
1715        {
1716            let mut m = sample_meta("k", body.len() as u64);
1717            m.version_id = Some((*vid).to_string());
1718            m.last_modified = base + chrono::Duration::seconds(i as i64);
1719            store
1720                .put_object(
1721                    "b",
1722                    "k",
1723                    Some(*vid),
1724                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1725                    &m,
1726                )
1727                .unwrap();
1728        }
1729        // Append a delete marker on top.
1730        let mut dm = sample_meta("k", 0);
1731        dm.version_id = Some("dm1".to_string());
1732        dm.is_delete_marker = true;
1733        dm.last_modified = base + chrono::Duration::seconds(10);
1734        store
1735            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1736            .unwrap();
1737        let loaded = store.load().unwrap();
1738        let snap = loaded.buckets.get("b").unwrap();
1739        assert!(
1740            !snap.objects.contains_key("k"),
1741            "trailing delete marker must hide current object",
1742        );
1743        assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1744    }
1745
1746    #[test]
1747    fn legacy_null_object_overridden_by_newer_versions() {
1748        // A pre-versioning null put followed by versioning-enabled puts must
1749        // see snap.objects reflect the newest live version, not the stale null.
1750        let tmp = TempDir::new().unwrap();
1751        let store = new_store(&tmp);
1752        store
1753            .put_bucket_meta(
1754                "b",
1755                &BucketMeta {
1756                    name: "b".to_string(),
1757                    ..Default::default()
1758                },
1759            )
1760            .unwrap();
1761        let base = chrono::Utc::now();
1762        let mut null_meta = sample_meta("k", 3);
1763        null_meta.last_modified = base;
1764        store
1765            .put_object(
1766                "b",
1767                "k",
1768                None,
1769                BodySource::Bytes(Bytes::from_static(b"old")),
1770                &null_meta,
1771            )
1772            .unwrap();
1773        // Enable versioning and put two versions, the second a delete.
1774        store
1775            .put_bucket_meta(
1776                "b",
1777                &BucketMeta {
1778                    name: "b".to_string(),
1779                    versioning: Some("Enabled".to_string()),
1780                    ..Default::default()
1781                },
1782            )
1783            .unwrap();
1784        let mut v1 = sample_meta("k", 3);
1785        v1.version_id = Some("v1".to_string());
1786        v1.last_modified = base + chrono::Duration::seconds(1);
1787        store
1788            .put_object(
1789                "b",
1790                "k",
1791                Some("v1"),
1792                BodySource::Bytes(Bytes::from_static(b"new")),
1793                &v1,
1794            )
1795            .unwrap();
1796        let mut v2 = sample_meta("k", 5);
1797        v2.version_id = Some("v2".to_string());
1798        v2.last_modified = base + chrono::Duration::seconds(2);
1799        store
1800            .put_object(
1801                "b",
1802                "k",
1803                Some("v2"),
1804                BodySource::Bytes(Bytes::from_static(b"newer")),
1805                &v2,
1806            )
1807            .unwrap();
1808        let loaded = store.load().unwrap();
1809        let snap = loaded.buckets.get("b").unwrap();
1810        let current = snap
1811            .objects
1812            .get("k")
1813            .expect("latest live version must override stale null");
1814        assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1815        assert_eq!(current.meta.size, 5);
1816    }
1817
1818    #[test]
1819    fn legacy_null_hidden_by_trailing_delete_marker() {
1820        let tmp = TempDir::new().unwrap();
1821        let store = new_store(&tmp);
1822        store
1823            .put_bucket_meta(
1824                "b",
1825                &BucketMeta {
1826                    name: "b".to_string(),
1827                    ..Default::default()
1828                },
1829            )
1830            .unwrap();
1831        let base = chrono::Utc::now();
1832        let mut null_meta = sample_meta("k", 3);
1833        null_meta.last_modified = base;
1834        store
1835            .put_object(
1836                "b",
1837                "k",
1838                None,
1839                BodySource::Bytes(Bytes::from_static(b"old")),
1840                &null_meta,
1841            )
1842            .unwrap();
1843        store
1844            .put_bucket_meta(
1845                "b",
1846                &BucketMeta {
1847                    name: "b".to_string(),
1848                    versioning: Some("Enabled".to_string()),
1849                    ..Default::default()
1850                },
1851            )
1852            .unwrap();
1853        let mut v1 = sample_meta("k", 3);
1854        v1.version_id = Some("v1".to_string());
1855        v1.last_modified = base + chrono::Duration::seconds(1);
1856        store
1857            .put_object(
1858                "b",
1859                "k",
1860                Some("v1"),
1861                BodySource::Bytes(Bytes::from_static(b"new")),
1862                &v1,
1863            )
1864            .unwrap();
1865        let mut dm = sample_meta("k", 0);
1866        dm.version_id = Some("dm1".to_string());
1867        dm.is_delete_marker = true;
1868        dm.last_modified = base + chrono::Duration::seconds(2);
1869        store
1870            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1871            .unwrap();
1872        let loaded = store.load().unwrap();
1873        let snap = loaded.buckets.get("b").unwrap();
1874        assert!(
1875            !snap.objects.contains_key("k"),
1876            "trailing delete marker must hide even a legacy null object",
1877        );
1878    }
1879
1880    #[test]
1881    fn delete_marker_roundtrip_no_body_file() {
1882        let tmp = TempDir::new().unwrap();
1883        let store = new_store(&tmp);
1884        store
1885            .put_bucket_meta(
1886                "b",
1887                &BucketMeta {
1888                    name: "b".to_string(),
1889                    versioning: Some("Enabled".to_string()),
1890                    ..Default::default()
1891                },
1892            )
1893            .unwrap();
1894        let mut m = sample_meta("k", 0);
1895        m.version_id = Some("dm1".to_string());
1896        m.is_delete_marker = true;
1897        store
1898            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1899            .unwrap();
1900        // No .bin file on disk for delete markers.
1901        let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1902        assert!(!bin.exists(), "delete marker must not have a .bin file");
1903        assert!(toml_path.exists());
1904
1905        let loaded = store.load().unwrap();
1906        let versions = loaded
1907            .buckets
1908            .get("b")
1909            .unwrap()
1910            .object_versions
1911            .get("k")
1912            .unwrap();
1913        assert_eq!(versions.len(), 1);
1914        assert!(versions[0].meta.is_delete_marker);
1915        match &versions[0].body {
1916            BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1917            _ => panic!("delete marker body should be empty Memory"),
1918        }
1919    }
1920
1921    #[test]
1922    fn mixed_nonversioned_and_versioned_buckets() {
1923        let tmp = TempDir::new().unwrap();
1924        let store = new_store(&tmp);
1925        store
1926            .put_bucket_meta(
1927                "a",
1928                &BucketMeta {
1929                    name: "a".to_string(),
1930                    ..Default::default()
1931                },
1932            )
1933            .unwrap();
1934        store
1935            .put_bucket_meta(
1936                "b",
1937                &BucketMeta {
1938                    name: "b".to_string(),
1939                    versioning: Some("Enabled".to_string()),
1940                    ..Default::default()
1941                },
1942            )
1943            .unwrap();
1944        let ma = sample_meta("only", 3);
1945        store
1946            .put_object(
1947                "a",
1948                "only",
1949                None,
1950                BodySource::Bytes(Bytes::from_static(b"aaa")),
1951                &ma,
1952            )
1953            .unwrap();
1954        let base = chrono::Utc::now();
1955        for (i, vid) in ["v1", "v2"].iter().enumerate() {
1956            let mut m = sample_meta("twice", 2);
1957            m.version_id = Some((*vid).to_string());
1958            m.last_modified = base + chrono::Duration::seconds(i as i64);
1959            store
1960                .put_object(
1961                    "b",
1962                    "twice",
1963                    Some(*vid),
1964                    BodySource::Bytes(Bytes::from_static(b"xx")),
1965                    &m,
1966                )
1967                .unwrap();
1968        }
1969        let loaded = store.load().unwrap();
1970        assert_eq!(loaded.buckets.len(), 2);
1971        let a = loaded.buckets.get("a").unwrap();
1972        assert_eq!(a.objects.len(), 1);
1973        assert!(a.object_versions.is_empty());
1974        let b = loaded.buckets.get("b").unwrap();
1975        // Fix #5: the latest live version is promoted into objects so
1976        // unversioned GETs see it post-restart.
1977        assert_eq!(
1978            b.objects.get("twice").unwrap().meta.version_id.as_deref(),
1979            Some("v2")
1980        );
1981        assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
1982    }
1983
1984    fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
1985        MpuInit {
1986            upload_id: upload_id.to_string(),
1987            key: key.to_string(),
1988            initiated: chrono::Utc::now(),
1989            content_type: "application/octet-stream".to_string(),
1990            storage_class: "STANDARD".to_string(),
1991            ..Default::default()
1992        }
1993    }
1994
1995    #[test]
1996    fn mpu_create_then_load_empty_parts() {
1997        let tmp = TempDir::new().unwrap();
1998        let store = new_store(&tmp);
1999        store
2000            .put_bucket_meta(
2001                "b",
2002                &BucketMeta {
2003                    name: "b".to_string(),
2004                    ..Default::default()
2005                },
2006            )
2007            .unwrap();
2008        let init = sample_mpu_init("up1", "k1");
2009        store.mpu_create("b", "up1", &init).unwrap();
2010        let loaded = store.load().unwrap();
2011        let snap = loaded.buckets.get("b").unwrap();
2012        let m = snap.multipart_uploads.get("up1").expect("upload present");
2013        assert_eq!(m.init.upload_id, "up1");
2014        assert_eq!(m.init.key, "k1");
2015        assert!(m.parts.is_empty());
2016    }
2017
2018    #[test]
2019    fn mpu_put_part_then_load_three_parts() {
2020        let tmp = TempDir::new().unwrap();
2021        let store = new_store(&tmp);
2022        store
2023            .put_bucket_meta(
2024                "b",
2025                &BucketMeta {
2026                    name: "b".to_string(),
2027                    ..Default::default()
2028                },
2029            )
2030            .unwrap();
2031        store
2032            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2033            .unwrap();
2034        let bodies = [
2035            Bytes::from_static(b"part-one"),
2036            Bytes::from_static(b"part-two-longer"),
2037            Bytes::from_static(b"p3"),
2038        ];
2039        for (i, body) in bodies.iter().enumerate() {
2040            let n = (i + 1) as u32;
2041            store
2042                .mpu_put_part(
2043                    "b",
2044                    "up",
2045                    n,
2046                    BodySource::Bytes(body.clone()),
2047                    &format!("et{}", n),
2048                )
2049                .unwrap();
2050        }
2051        let loaded = store.load().unwrap();
2052        let snap = loaded.buckets.get("b").unwrap();
2053        let m = snap.multipart_uploads.get("up").unwrap();
2054        assert_eq!(m.parts.len(), 3);
2055        for (i, body) in bodies.iter().enumerate() {
2056            let n = (i + 1) as u32;
2057            let part = m.parts.get(&n).unwrap();
2058            assert_eq!(part.meta.part_number, n);
2059            assert_eq!(part.meta.size, body.len() as u64);
2060            assert_eq!(part.meta.etag, format!("et{}", n));
2061            match &part.body {
2062                BodyRef::Disk { path, size, .. } => {
2063                    assert_eq!(*size, body.len() as u64);
2064                    assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2065                }
2066                _ => panic!("expected Disk"),
2067            }
2068        }
2069    }
2070
2071    #[test]
2072    fn mpu_abort_removes_upload() {
2073        let tmp = TempDir::new().unwrap();
2074        let store = new_store(&tmp);
2075        store
2076            .put_bucket_meta(
2077                "b",
2078                &BucketMeta {
2079                    name: "b".to_string(),
2080                    ..Default::default()
2081                },
2082            )
2083            .unwrap();
2084        store
2085            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2086            .unwrap();
2087        store
2088            .mpu_put_part(
2089                "b",
2090                "up",
2091                1,
2092                BodySource::Bytes(Bytes::from_static(b"x")),
2093                "e",
2094            )
2095            .unwrap();
2096        store.mpu_abort("b", "up").unwrap();
2097        assert!(!store.mpu_dir("b", "up").exists());
2098        // Idempotent.
2099        store.mpu_abort("b", "up").unwrap();
2100        let loaded = store.load().unwrap();
2101        let snap = loaded.buckets.get("b").unwrap();
2102        assert!(snap.multipart_uploads.is_empty());
2103    }
2104
2105    #[test]
2106    fn mpu_complete_single_part_fast_path() {
2107        let tmp = TempDir::new().unwrap();
2108        let store = new_store(&tmp);
2109        store
2110            .put_bucket_meta(
2111                "b",
2112                &BucketMeta {
2113                    name: "b".to_string(),
2114                    ..Default::default()
2115                },
2116            )
2117            .unwrap();
2118        store
2119            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2120            .unwrap();
2121        let body = Bytes::from_static(b"only-part-bytes");
2122        store
2123            .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2124            .unwrap();
2125        let meta = sample_meta("k", body.len() as u64);
2126        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2127        match &body_ref {
2128            BodyRef::Disk { path, size, .. } => {
2129                assert_eq!(*size, body.len() as u64);
2130                assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2131            }
2132            _ => panic!("expected Disk"),
2133        }
2134        assert!(!store.mpu_dir("b", "up").exists());
2135    }
2136
2137    #[test]
2138    fn mpu_complete_multi_part_concat() {
2139        let tmp = TempDir::new().unwrap();
2140        let store = new_store(&tmp);
2141        store
2142            .put_bucket_meta(
2143                "b",
2144                &BucketMeta {
2145                    name: "b".to_string(),
2146                    ..Default::default()
2147                },
2148            )
2149            .unwrap();
2150        store
2151            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2152            .unwrap();
2153        let p1 = Bytes::from_static(b"AAAA");
2154        let p2 = Bytes::from_static(b"BBBBBB");
2155        let p3 = Bytes::from_static(b"CC");
2156        for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2157            store
2158                .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2159                .unwrap();
2160        }
2161        let mut expected = Vec::new();
2162        expected.extend_from_slice(&p1);
2163        expected.extend_from_slice(&p2);
2164        expected.extend_from_slice(&p3);
2165        let meta = sample_meta("k", expected.len() as u64);
2166        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2167        let path = match body_ref {
2168            BodyRef::Disk { path, size, .. } => {
2169                assert_eq!(size, expected.len() as u64);
2170                path
2171            }
2172            _ => panic!("expected Disk"),
2173        };
2174        assert_eq!(std::fs::read(&path).unwrap(), expected);
2175        assert!(!store.mpu_dir("b", "up").exists());
2176    }
2177
2178    #[test]
2179    fn mpu_complete_large_streaming_via_file_source() {
2180        let tmp = TempDir::new().unwrap();
2181        let store = new_store(&tmp);
2182        store
2183            .put_bucket_meta(
2184                "b",
2185                &BucketMeta {
2186                    name: "b".to_string(),
2187                    ..Default::default()
2188                },
2189            )
2190            .unwrap();
2191        store
2192            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2193            .unwrap();
2194
2195        // 3 parts of ~1 MiB each (kept small so tests stay fast but still
2196        // exercise the BodySource::File + streaming concat path).
2197        const PART_SIZE: usize = 1024 * 1024;
2198        let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2199        let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2200        for (i, byte) in patterns.iter().enumerate() {
2201            let src = tmp.path().join(format!("src-{}.bin", i + 1));
2202            let data = vec![*byte; PART_SIZE];
2203            std::fs::write(&src, &data).unwrap();
2204            expected.extend_from_slice(&data);
2205            store
2206                .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2207                .unwrap();
2208        }
2209        let meta = sample_meta("k", expected.len() as u64);
2210        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2211        let path = match body_ref {
2212            BodyRef::Disk { path, size, .. } => {
2213                assert_eq!(size, expected.len() as u64);
2214                path
2215            }
2216            _ => panic!("expected Disk"),
2217        };
2218        let actual = std::fs::read(&path).unwrap();
2219        assert_eq!(actual.len(), expected.len());
2220        assert_eq!(actual, expected);
2221        assert!(!store.mpu_dir("b", "up").exists());
2222    }
2223
2224    #[test]
2225    fn mpu_resumable_across_load() {
2226        let tmp = TempDir::new().unwrap();
2227        let store = new_store(&tmp);
2228        store
2229            .put_bucket_meta(
2230                "b",
2231                &BucketMeta {
2232                    name: "b".to_string(),
2233                    ..Default::default()
2234                },
2235            )
2236            .unwrap();
2237        store
2238            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2239            .unwrap();
2240        let p1 = Bytes::from_static(b"hello-");
2241        let p2 = Bytes::from_static(b"world-");
2242        store
2243            .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2244            .unwrap();
2245        store
2246            .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2247            .unwrap();
2248
2249        // Simulate a restart: re-open the store on the same dir and load.
2250        let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2251        let loaded = store2.load().unwrap();
2252        let snap = loaded.buckets.get("b").unwrap();
2253        let m = snap.multipart_uploads.get("up").unwrap();
2254        assert_eq!(m.parts.len(), 2);
2255        assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2256        assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2257
2258        // Continue the upload on the fresh store.
2259        let p3 = Bytes::from_static(b"again!");
2260        store2
2261            .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2262            .unwrap();
2263        let mut expected = Vec::new();
2264        expected.extend_from_slice(&p1);
2265        expected.extend_from_slice(&p2);
2266        expected.extend_from_slice(&p3);
2267        let meta = sample_meta("k", expected.len() as u64);
2268        let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2269        let path = match body_ref {
2270            BodyRef::Disk { path, .. } => path,
2271            _ => panic!(),
2272        };
2273        assert_eq!(std::fs::read(&path).unwrap(), expected);
2274        assert!(!store2.mpu_dir("b", "up").exists());
2275    }
2276
2277    #[test]
2278    fn tags_snapshot_roundtrip_via_store() {
2279        let tmp = TempDir::new().unwrap();
2280        let store = new_store(&tmp);
2281        store
2282            .put_bucket_meta(
2283                "b",
2284                &BucketMeta {
2285                    name: "b".to_string(),
2286                    ..Default::default()
2287                },
2288            )
2289            .unwrap();
2290        let mut tags = BTreeMap::new();
2291        tags.insert("env".to_string(), "prod".to_string());
2292        tags.insert("team".to_string(), "s3".to_string());
2293        let snap = TagsSnapshot { tags: tags.clone() };
2294        let payload = toml::to_string(&snap).unwrap();
2295        store
2296            .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2297            .unwrap();
2298        let loaded = store.load().unwrap();
2299        let text = loaded
2300            .buckets
2301            .get("b")
2302            .unwrap()
2303            .subresources
2304            .get("tags.toml")
2305            .cloned()
2306            .unwrap();
2307        let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2308        assert_eq!(decoded.tags, tags);
2309    }
2310
2311    #[test]
2312    fn acl_snapshot_roundtrip_via_store() {
2313        let tmp = TempDir::new().unwrap();
2314        let store = new_store(&tmp);
2315        store
2316            .put_bucket_meta(
2317                "b",
2318                &BucketMeta {
2319                    name: "b".to_string(),
2320                    ..Default::default()
2321                },
2322            )
2323            .unwrap();
2324        let snap = AclSnapshot {
2325            owner_id: "owner-abc".to_string(),
2326            grants: vec![
2327                AclGrantSnapshot {
2328                    grantee_type: "CanonicalUser".to_string(),
2329                    grantee_id: Some("owner-abc".to_string()),
2330                    grantee_display_name: Some("owner".to_string()),
2331                    grantee_uri: None,
2332                    permission: "FULL_CONTROL".to_string(),
2333                },
2334                AclGrantSnapshot {
2335                    grantee_type: "Group".to_string(),
2336                    grantee_id: None,
2337                    grantee_display_name: None,
2338                    grantee_uri: Some(
2339                        "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2340                    ),
2341                    permission: "READ".to_string(),
2342                },
2343            ],
2344        };
2345        let payload = toml::to_string(&snap).unwrap();
2346        store
2347            .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2348            .unwrap();
2349        let loaded = store.load().unwrap();
2350        let text = loaded
2351            .buckets
2352            .get("b")
2353            .unwrap()
2354            .subresources
2355            .get("acl.toml")
2356            .cloned()
2357            .unwrap();
2358        let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2359        assert_eq!(decoded.owner_id, "owner-abc");
2360        assert_eq!(decoded.grants.len(), 2);
2361        assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2362        assert_eq!(decoded.grants[1].grantee_type, "Group");
2363    }
2364
2365    #[test]
2366    fn inventory_snapshot_roundtrip_via_store() {
2367        let tmp = TempDir::new().unwrap();
2368        let store = new_store(&tmp);
2369        store
2370            .put_bucket_meta(
2371                "b",
2372                &BucketMeta {
2373                    name: "b".to_string(),
2374                    ..Default::default()
2375                },
2376            )
2377            .unwrap();
2378        let mut configs = BTreeMap::new();
2379        configs.insert(
2380            "inv-1".to_string(),
2381            "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2382        );
2383        configs.insert(
2384            "inv-2".to_string(),
2385            "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2386        );
2387        let snap = InventorySnapshot {
2388            configs: configs.clone(),
2389        };
2390        let payload = toml::to_string(&snap).unwrap();
2391        store
2392            .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2393            .unwrap();
2394        let loaded = store.load().unwrap();
2395        let text = loaded
2396            .buckets
2397            .get("b")
2398            .unwrap()
2399            .subresources
2400            .get("inventory.toml")
2401            .cloned()
2402            .unwrap();
2403        let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2404        assert_eq!(decoded.configs, configs);
2405    }
2406
2407    #[test]
2408    fn legacy_versioning_file_is_read() {
2409        let tmp = TempDir::new().unwrap();
2410        let store = new_store(&tmp);
2411        // Bucket meta with no versioning field set.
2412        store
2413            .put_bucket_meta(
2414                "b",
2415                &BucketMeta {
2416                    name: "b".to_string(),
2417                    ..Default::default()
2418                },
2419            )
2420            .unwrap();
2421        // Legacy sidecar: a bare versioning.toml with "Enabled".
2422        let path = store.bucket_dir("b").join("versioning.toml");
2423        std::fs::write(&path, "Enabled").unwrap();
2424        let loaded = store.load().unwrap();
2425        let snap = loaded.buckets.get("b").unwrap();
2426        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2427    }
2428}