Skip to main content

fakecloud_persistence/
s3.rs

1use std::collections::{BTreeMap, HashMap};
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11    #[serde(default)]
12    pub account_id: String,
13    #[serde(default)]
14    pub region: String,
15    #[serde(default)]
16    pub buckets: HashMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21    pub meta: BucketMeta,
22    #[serde(default)]
23    pub objects: HashMap<String, LoadedObject>,
24    #[serde(default)]
25    pub object_versions: HashMap<String, Vec<LoadedObject>>,
26    #[serde(default)]
27    pub subresources: HashMap<String, String>,
28    #[serde(default)]
29    pub multipart_uploads: HashMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34    pub init: MpuInit,
35    #[serde(default)]
36    pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41    pub meta: UploadPartMeta,
42    #[serde(default)]
43    pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48    pub meta: ObjectMeta,
49    #[serde(default)]
50    pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55    #[serde(default)]
56    pub name: String,
57    #[serde(default = "default_time")]
58    pub creation_date: DateTime<Utc>,
59    #[serde(default)]
60    pub region: String,
61    #[serde(default)]
62    pub versioning: Option<String>,
63    #[serde(default)]
64    pub acl: Option<String>,
65    #[serde(default)]
66    pub acl_owner_id: String,
67    #[serde(default)]
68    pub accelerate_status: Option<String>,
69    #[serde(default)]
70    pub eventbridge_enabled: bool,
71}
72
73fn default_time() -> DateTime<Utc> {
74    DateTime::<Utc>::MIN_UTC
75}
76
77#[derive(Debug, Default, Clone, Serialize, Deserialize)]
78pub struct AclGrantSnapshot {
79    pub grantee_type: String,
80    #[serde(default)]
81    pub grantee_id: Option<String>,
82    #[serde(default)]
83    pub grantee_display_name: Option<String>,
84    #[serde(default)]
85    pub grantee_uri: Option<String>,
86    pub permission: String,
87}
88
89#[derive(Debug, Default, Clone, Serialize, Deserialize)]
90pub struct TagsSnapshot {
91    #[serde(default)]
92    pub tags: HashMap<String, String>,
93}
94
95#[derive(Debug, Default, Clone, Serialize, Deserialize)]
96pub struct AclSnapshot {
97    #[serde(default)]
98    pub owner_id: String,
99    #[serde(default)]
100    pub grants: Vec<AclGrantSnapshot>,
101}
102
103#[derive(Debug, Default, Clone, Serialize, Deserialize)]
104pub struct InventorySnapshot {
105    #[serde(default)]
106    pub configs: HashMap<String, String>,
107}
108
109#[derive(Debug, Default, Clone, Serialize, Deserialize)]
110pub struct ObjectMeta {
111    #[serde(default)]
112    pub key: String,
113    #[serde(default)]
114    pub content_type: String,
115    #[serde(default)]
116    pub etag: String,
117    #[serde(default)]
118    pub size: u64,
119    #[serde(default = "default_time")]
120    pub last_modified: DateTime<Utc>,
121    #[serde(default)]
122    pub metadata: HashMap<String, String>,
123    #[serde(default)]
124    pub tags: HashMap<String, String>,
125    #[serde(default)]
126    pub storage_class: String,
127    #[serde(default)]
128    pub acl_grants: Vec<AclGrantSnapshot>,
129    #[serde(default)]
130    pub acl_owner_id: Option<String>,
131    #[serde(default)]
132    pub parts_count: Option<u32>,
133    #[serde(default)]
134    pub part_sizes: Option<Vec<(u32, u64)>>,
135    #[serde(default)]
136    pub sse_algorithm: Option<String>,
137    #[serde(default)]
138    pub sse_kms_key_id: Option<String>,
139    #[serde(default)]
140    pub bucket_key_enabled: Option<bool>,
141    #[serde(default)]
142    pub version_id: Option<String>,
143    #[serde(default)]
144    pub is_delete_marker: bool,
145    #[serde(default)]
146    pub restore_ongoing: Option<bool>,
147    #[serde(default)]
148    pub restore_expiry: Option<String>,
149    #[serde(default)]
150    pub checksum_algorithm: Option<String>,
151    #[serde(default)]
152    pub checksum_value: Option<String>,
153    #[serde(default)]
154    pub lock_mode: Option<String>,
155    #[serde(default)]
156    pub lock_retain_until: Option<DateTime<Utc>>,
157    #[serde(default)]
158    pub lock_legal_hold: Option<String>,
159    #[serde(default)]
160    pub content_encoding: Option<String>,
161    #[serde(default)]
162    pub website_redirect_location: Option<String>,
163}
164
165#[derive(Debug, Default, Clone, Serialize, Deserialize)]
166pub struct MpuInit {
167    pub upload_id: String,
168    pub key: String,
169    #[serde(default = "default_time")]
170    pub initiated: DateTime<Utc>,
171    #[serde(default)]
172    pub metadata: HashMap<String, String>,
173    #[serde(default)]
174    pub content_type: String,
175    #[serde(default)]
176    pub storage_class: String,
177    #[serde(default)]
178    pub sse_algorithm: Option<String>,
179    #[serde(default)]
180    pub sse_kms_key_id: Option<String>,
181    #[serde(default)]
182    pub tagging: Option<String>,
183    #[serde(default)]
184    pub acl_grants: Vec<AclGrantSnapshot>,
185    #[serde(default)]
186    pub checksum_algorithm: Option<String>,
187}
188
189#[derive(Debug, Default, Clone, Serialize, Deserialize)]
190pub struct UploadPartMeta {
191    pub part_number: u32,
192    pub etag: String,
193    pub size: u64,
194    #[serde(default = "default_time")]
195    pub last_modified: DateTime<Utc>,
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub enum BucketSubresource {
200    Tags,
201    Lifecycle,
202    Cors,
203    Policy,
204    Notification,
205    Logging,
206    Website,
207    PublicAccessBlock,
208    ObjectLock,
209    Replication,
210    Ownership,
211    Inventory,
212    Encryption,
213    Versioning,
214    Acl,
215    Accelerate,
216    Analytics,
217    IntelligentTiering,
218    Metrics,
219    RequestPayment,
220    Abac,
221    MetadataConfiguration,
222    MetadataTableConfiguration,
223}
224
225pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
226    BucketSubresource::Tags,
227    BucketSubresource::Lifecycle,
228    BucketSubresource::Cors,
229    BucketSubresource::Policy,
230    BucketSubresource::Notification,
231    BucketSubresource::Logging,
232    BucketSubresource::Website,
233    BucketSubresource::PublicAccessBlock,
234    BucketSubresource::ObjectLock,
235    BucketSubresource::Replication,
236    BucketSubresource::Ownership,
237    BucketSubresource::Inventory,
238    BucketSubresource::Encryption,
239    BucketSubresource::Versioning,
240    BucketSubresource::Acl,
241    BucketSubresource::Accelerate,
242    BucketSubresource::Analytics,
243    BucketSubresource::IntelligentTiering,
244    BucketSubresource::Metrics,
245    BucketSubresource::RequestPayment,
246    BucketSubresource::Abac,
247    BucketSubresource::MetadataConfiguration,
248    BucketSubresource::MetadataTableConfiguration,
249];
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub enum BodyRef {
253    #[serde(skip)]
254    Memory(Bytes),
255    Disk {
256        bucket: String,
257        key: String,
258        #[serde(default)]
259        version: Option<String>,
260        path: PathBuf,
261        size: u64,
262    },
263}
264
265impl BodyRef {
266    pub fn size(&self) -> u64 {
267        match self {
268            BodyRef::Memory(b) => b.len() as u64,
269            BodyRef::Disk { size, .. } => *size,
270        }
271    }
272}
273
274impl Default for BodyRef {
275    fn default() -> Self {
276        BodyRef::Memory(Bytes::new())
277    }
278}
279
280#[derive(Debug)]
281pub enum BodySource {
282    Bytes(Bytes),
283    /// Existing disk path that should be *moved* into the destination via
284    /// rename (upload-tmp → final) and consumed.
285    File(PathBuf),
286    /// Existing disk path that should be *copied* into the destination and
287    /// left in place. Used by replication so the source object stays
288    /// available while the replica is produced.
289    FileCopy(PathBuf),
290}
291
292/// Materialize a [`BodySource`] into [`Bytes`]. The `File` variant is
293/// consumed (file is unlinked after read); `FileCopy` is left in place.
294/// Memory-mode stores call this to absorb tempfiles produced by the
295/// streaming dispatch path so streaming and non-streaming services share
296/// the same wire-shape regardless of `--storage-mode`.
297fn read_body_source(body: BodySource) -> StoreResult<Bytes> {
298    match body {
299        BodySource::Bytes(b) => Ok(b),
300        BodySource::File(p) => {
301            let bytes = std::fs::read(&p)?;
302            // Best-effort: a leftover tempfile is not a correctness
303            // problem so the read result wins over the unlink result.
304            let _ = std::fs::remove_file(&p);
305            Ok(Bytes::from(bytes))
306        }
307        BodySource::FileCopy(p) => {
308            let bytes = std::fs::read(&p)?;
309            Ok(Bytes::from(bytes))
310        }
311    }
312}
313
314#[derive(Debug, Error)]
315pub enum StoreError {
316    #[error("io error: {0}")]
317    Io(#[from] std::io::Error),
318    #[error("serialization error: {0}")]
319    Serde(String),
320    #[error("not supported by this store")]
321    NotSupported,
322    #[error("{0}")]
323    Other(String),
324}
325
326pub type StoreResult<T> = Result<T, StoreError>;
327
328pub trait S3Store: Send + Sync {
329    fn load(&self) -> StoreResult<S3State>;
330
331    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
332    fn put_bucket_subresource(
333        &self,
334        bucket: &str,
335        kind: BucketSubresource,
336        payload: &str,
337    ) -> StoreResult<()>;
338    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
339    fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
340
341    fn put_object(
342        &self,
343        bucket: &str,
344        key: &str,
345        version: Option<&str>,
346        body: BodySource,
347        meta: &ObjectMeta,
348    ) -> StoreResult<BodyRef>;
349    fn put_object_meta(
350        &self,
351        bucket: &str,
352        key: &str,
353        version: Option<&str>,
354        meta: &ObjectMeta,
355    ) -> StoreResult<()>;
356    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
357    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
358
359    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
360    /// Persist a multipart-upload part body. Returns the [`BodyRef`] the
361    /// service should keep in its in-memory part map: `BodyRef::Disk`
362    /// for disk-backed stores (so subsequent reads stream from the
363    /// `.bin` file instead of holding the part in RAM), or
364    /// `BodyRef::Memory` for memory-mode stores.
365    fn mpu_put_part(
366        &self,
367        bucket: &str,
368        upload_id: &str,
369        part_number: u32,
370        body: BodySource,
371        etag: &str,
372    ) -> StoreResult<BodyRef>;
373    /// Where the streaming dispatch path should spool large request
374    /// bodies to disk before handing them to [`Self::put_object`] /
375    /// [`Self::mpu_put_part`]. Returning `Some` keeps the spool file on
376    /// the same filesystem as the final destination so `rename(2)` is a
377    /// metadata-only move; returning `None` means "use the system temp
378    /// dir, the store will read the file back into RAM and unlink it".
379    fn spool_dir(&self) -> Option<std::path::PathBuf> {
380        None
381    }
382    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
383    fn mpu_complete(
384        &self,
385        bucket: &str,
386        upload_id: &str,
387        final_key: &str,
388        version: Option<&str>,
389        meta: &ObjectMeta,
390    ) -> StoreResult<BodyRef>;
391}
392
393pub struct MemoryS3Store;
394
395impl MemoryS3Store {
396    pub fn new() -> Self {
397        Self
398    }
399}
400
401impl Default for MemoryS3Store {
402    fn default() -> Self {
403        Self::new()
404    }
405}
406
407impl S3Store for MemoryS3Store {
408    fn load(&self) -> StoreResult<S3State> {
409        Ok(S3State::default())
410    }
411
412    fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
413        Ok(())
414    }
415    fn put_bucket_subresource(
416        &self,
417        _bucket: &str,
418        _kind: BucketSubresource,
419        _payload: &str,
420    ) -> StoreResult<()> {
421        Ok(())
422    }
423    fn delete_bucket_subresource(
424        &self,
425        _bucket: &str,
426        _kind: BucketSubresource,
427    ) -> StoreResult<()> {
428        Ok(())
429    }
430    fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
431        Ok(())
432    }
433
434    fn put_object(
435        &self,
436        _bucket: &str,
437        _key: &str,
438        _version: Option<&str>,
439        body: BodySource,
440        _meta: &ObjectMeta,
441    ) -> StoreResult<BodyRef> {
442        Ok(BodyRef::Memory(read_body_source(body)?))
443    }
444    fn put_object_meta(
445        &self,
446        _bucket: &str,
447        _key: &str,
448        _version: Option<&str>,
449        _meta: &ObjectMeta,
450    ) -> StoreResult<()> {
451        Ok(())
452    }
453    fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
454        Ok(())
455    }
456    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
457        match body {
458            BodyRef::Memory(b) => Ok(b.clone()),
459            BodyRef::Disk { .. } => {
460                panic!("MemoryS3Store cannot open Disk-backed BodyRef")
461            }
462        }
463    }
464
465    fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
466        Ok(())
467    }
468    fn mpu_put_part(
469        &self,
470        _bucket: &str,
471        _upload_id: &str,
472        _part_number: u32,
473        body: BodySource,
474        _etag: &str,
475    ) -> StoreResult<BodyRef> {
476        // Memory mode keeps all part bodies in RAM. Absorb any tempfile
477        // produced by the streaming dispatch path here.
478        Ok(BodyRef::Memory(read_body_source(body)?))
479    }
480    fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
481        Ok(())
482    }
483    fn mpu_complete(
484        &self,
485        _bucket: &str,
486        _upload_id: &str,
487        _final_key: &str,
488        _version: Option<&str>,
489        _meta: &ObjectMeta,
490    ) -> StoreResult<BodyRef> {
491        Ok(BodyRef::Memory(Bytes::new()))
492    }
493}
494
495pub struct DiskS3Store {
496    root: PathBuf,
497    cache: std::sync::Arc<crate::cache::BodyCache>,
498}
499
500impl DiskS3Store {
501    pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
502        Self { root, cache }
503    }
504
505    fn buckets_dir(&self) -> PathBuf {
506        self.root.join("buckets")
507    }
508
509    fn bucket_dir(&self, bucket: &str) -> PathBuf {
510        self.buckets_dir()
511            .join(crate::key_escape::escape_key_segment(bucket))
512    }
513
514    fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
515        self.bucket_dir(bucket)
516            .join("objects")
517            .join(crate::key_escape::escape_key_segment(key))
518    }
519
520    fn version_tag(version: Option<&str>) -> String {
521        version.unwrap_or("null").to_string()
522    }
523
524    fn object_paths(
525        &self,
526        bucket: &str,
527        key: &str,
528        version: Option<&str>,
529    ) -> (PathBuf, PathBuf, PathBuf) {
530        let dir = self.object_dir(bucket, key);
531        let tag = Self::version_tag(version);
532        let bin = dir.join(format!("{}.bin", tag));
533        let toml = dir.join(format!("{}.toml", tag));
534        (dir, bin, toml)
535    }
536
537    fn subresource_filename(kind: BucketSubresource) -> &'static str {
538        match kind {
539            BucketSubresource::Tags => "tags.toml",
540            BucketSubresource::Lifecycle => "lifecycle.toml",
541            BucketSubresource::Cors => "cors.toml",
542            BucketSubresource::Policy => "policy.toml",
543            BucketSubresource::Notification => "notification.toml",
544            BucketSubresource::Logging => "logging.toml",
545            BucketSubresource::Website => "website.toml",
546            BucketSubresource::PublicAccessBlock => "public_access_block.toml",
547            BucketSubresource::ObjectLock => "object_lock.toml",
548            BucketSubresource::Replication => "replication.toml",
549            BucketSubresource::Ownership => "ownership.toml",
550            BucketSubresource::Inventory => "inventory.toml",
551            BucketSubresource::Encryption => "encryption.toml",
552            BucketSubresource::Versioning => "versioning.toml",
553            BucketSubresource::Acl => "acl.toml",
554            BucketSubresource::Accelerate => "accelerate.toml",
555            BucketSubresource::Analytics => "analytics.toml",
556            BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
557            BucketSubresource::Metrics => "metrics.toml",
558            BucketSubresource::RequestPayment => "request_payment.toml",
559            BucketSubresource::Abac => "abac.toml",
560            BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
561            BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
562        }
563    }
564
565    fn cleanup_empty(dir: &std::path::Path) {
566        let _ = std::fs::remove_dir(dir);
567    }
568
569    fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
570        self.bucket_dir(bucket)
571            .join("mpu")
572            .join(crate::key_escape::escape_key_segment(upload_id))
573    }
574
575    fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
576        self.mpu_dir(bucket, upload_id).join("parts")
577    }
578
579    fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
580        self.mpu_parts_dir(bucket, upload_id)
581            .join(format!("{}.bin", part_number))
582    }
583
584    fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
585        self.mpu_parts_dir(bucket, upload_id)
586            .join(format!("{}.toml", part_number))
587    }
588
589    fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
590        crate::cache::BodyKey::new(
591            bucket.to_string(),
592            format!("__mpu__/{}", upload_id),
593            Some(format!("part-{}", part_number)),
594        )
595    }
596}
597
598fn io_other(msg: impl Into<String>) -> StoreError {
599    StoreError::Other(msg.into())
600}
601
602impl S3Store for DiskS3Store {
603    fn load(&self) -> StoreResult<S3State> {
604        let mut state = S3State::default();
605        let buckets_dir = self.buckets_dir();
606        if !buckets_dir.exists() {
607            return Ok(state);
608        }
609        for entry in std::fs::read_dir(&buckets_dir)? {
610            let entry = entry?;
611            if !entry.file_type()?.is_dir() {
612                continue;
613            }
614            let bdir = entry.path();
615            let meta_path = bdir.join("meta.toml");
616            if !meta_path.exists() {
617                continue;
618            }
619            let meta_text = std::fs::read_to_string(&meta_path)?;
620            let mut meta: BucketMeta =
621                toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
622            let mut snap = BucketSnapshot {
623                meta: meta.clone(),
624                objects: HashMap::new(),
625                object_versions: HashMap::new(),
626                subresources: HashMap::new(),
627                multipart_uploads: HashMap::new(),
628            };
629
630            for kind in ALL_SUBRESOURCES {
631                let fname = Self::subresource_filename(*kind);
632                let path = bdir.join(fname);
633                if path.exists() {
634                    let text = std::fs::read_to_string(&path)?;
635                    if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none() {
636                        let stripped = text.trim();
637                        if !stripped.is_empty() {
638                            snap.meta.versioning = Some(stripped.to_string());
639                            meta.versioning = snap.meta.versioning.clone();
640                        }
641                    }
642                    snap.subresources.insert(fname.to_string(), text);
643                }
644            }
645
646            let objects_root = bdir.join("objects");
647            if objects_root.exists() {
648                for okey_entry in std::fs::read_dir(&objects_root)? {
649                    let okey_entry = okey_entry?;
650                    if !okey_entry.file_type()?.is_dir() {
651                        continue;
652                    }
653                    let key_dir = okey_entry.path();
654                    let mut versioned: Vec<LoadedObject> = Vec::new();
655                    let mut key_name: Option<String> = None;
656                    for version_entry in std::fs::read_dir(&key_dir)? {
657                        let version_entry = version_entry?;
658                        let path = version_entry.path();
659                        let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
660                            continue;
661                        };
662                        if !fname.ends_with(".toml") {
663                            continue;
664                        }
665                        let version_tag = &fname[..fname.len() - 5];
666                        let toml_text = std::fs::read_to_string(&path)?;
667                        let obj_meta: ObjectMeta = toml::from_str(&toml_text)
668                            .map_err(|e| StoreError::Serde(e.to_string()))?;
669                        let bin_path = key_dir.join(format!("{}.bin", version_tag));
670                        let (body, size) = if obj_meta.is_delete_marker {
671                            (BodyRef::Memory(Bytes::new()), 0u64)
672                        } else if bin_path.exists() {
673                            let sz = std::fs::metadata(&bin_path)?.len();
674                            (
675                                BodyRef::Disk {
676                                    bucket: meta.name.clone(),
677                                    key: obj_meta.key.clone(),
678                                    version: if version_tag == "null" {
679                                        None
680                                    } else {
681                                        Some(version_tag.to_string())
682                                    },
683                                    path: bin_path,
684                                    size: sz,
685                                },
686                                sz,
687                            )
688                        } else {
689                            // Fail loud: the sidecar says this object has a
690                            // body but the .bin file is missing. Returning
691                            // silently would hand the caller a truncated
692                            // object and hide data loss.
693                            return Err(StoreError::Other(format!(
694                                "missing body file: {}",
695                                bin_path.display()
696                            )));
697                        };
698                        let _ = size;
699                        key_name.get_or_insert_with(|| obj_meta.key.clone());
700                        if version_tag == "null" && obj_meta.version_id.is_none() {
701                            snap.objects.insert(
702                                obj_meta.key.clone(),
703                                LoadedObject {
704                                    meta: obj_meta,
705                                    body,
706                                },
707                            );
708                        } else {
709                            versioned.push(LoadedObject {
710                                meta: obj_meta,
711                                body,
712                            });
713                        }
714                    }
715                    if !versioned.is_empty() {
716                        versioned.sort_by_key(|v| v.meta.last_modified);
717                        if let Some(key) = key_name {
718                            // Reconcile snap.objects with the newest version:
719                            // a trailing delete marker hides any prior null or
720                            // live version (remove it); otherwise overwrite
721                            // with the newest live version, even if a
722                            // pre-versioning null.toml had already been
723                            // inserted during the non-versioned scan.
724                            match versioned.last() {
725                                Some(newest) if newest.meta.is_delete_marker => {
726                                    snap.objects.remove(&key);
727                                }
728                                Some(newest) => {
729                                    snap.objects.insert(key.clone(), newest.clone());
730                                }
731                                None => {}
732                            }
733                            snap.object_versions.insert(key, versioned);
734                        }
735                    }
736                }
737            }
738
739            let mpu_root = bdir.join("mpu");
740            if mpu_root.exists() {
741                for upload_entry in std::fs::read_dir(&mpu_root)? {
742                    let upload_entry = upload_entry?;
743                    if !upload_entry.file_type()?.is_dir() {
744                        continue;
745                    }
746                    let upload_dir = upload_entry.path();
747                    let init_path = upload_dir.join("init.toml");
748                    if !init_path.exists() {
749                        continue;
750                    }
751                    let init_text = std::fs::read_to_string(&init_path)?;
752                    let init: MpuInit =
753                        toml::from_str(&init_text).map_err(|e| StoreError::Serde(e.to_string()))?;
754                    let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
755                    let parts_dir = upload_dir.join("parts");
756                    if parts_dir.exists() {
757                        for part_entry in std::fs::read_dir(&parts_dir)? {
758                            let part_entry = part_entry?;
759                            let path = part_entry.path();
760                            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
761                                continue;
762                            };
763                            if !fname.ends_with(".toml") {
764                                continue;
765                            }
766                            let stem = &fname[..fname.len() - 5];
767                            let Ok(part_number) = stem.parse::<u32>() else {
768                                continue;
769                            };
770                            let toml_text = std::fs::read_to_string(&path)?;
771                            let part_meta: UploadPartMeta = toml::from_str(&toml_text)
772                                .map_err(|e| StoreError::Serde(e.to_string()))?;
773                            let bin_path = parts_dir.join(format!("{}.bin", part_number));
774                            if !bin_path.exists() {
775                                return Err(StoreError::Other(format!(
776                                    "missing multipart part body file: {}",
777                                    bin_path.display()
778                                )));
779                            }
780                            let sz = std::fs::metadata(&bin_path)?.len();
781                            let body = BodyRef::Disk {
782                                bucket: meta.name.clone(),
783                                key: format!("__mpu__/{}", init.upload_id),
784                                version: Some(format!("part-{}", part_number)),
785                                path: bin_path,
786                                size: sz,
787                            };
788                            loaded_parts.insert(
789                                part_number,
790                                LoadedPart {
791                                    meta: part_meta,
792                                    body,
793                                },
794                            );
795                        }
796                    }
797                    snap.multipart_uploads.insert(
798                        init.upload_id.clone(),
799                        LoadedMpu {
800                            init,
801                            parts: loaded_parts,
802                        },
803                    );
804                }
805            }
806
807            state.buckets.insert(meta.name.clone(), snap);
808        }
809        Ok(state)
810    }
811
812    fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
813        let dir = self.bucket_dir(bucket);
814        std::fs::create_dir_all(&dir)?;
815        crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
816        Ok(())
817    }
818
819    fn put_bucket_subresource(
820        &self,
821        bucket: &str,
822        kind: BucketSubresource,
823        payload: &str,
824    ) -> StoreResult<()> {
825        let dir = self.bucket_dir(bucket);
826        std::fs::create_dir_all(&dir)?;
827        let path = dir.join(Self::subresource_filename(kind));
828        crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
829        Ok(())
830    }
831
832    fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
833        let path = self
834            .bucket_dir(bucket)
835            .join(Self::subresource_filename(kind));
836        match std::fs::remove_file(&path) {
837            Ok(_) => Ok(()),
838            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
839            Err(e) => Err(e.into()),
840        }
841    }
842
843    fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
844        let dir = self.bucket_dir(bucket);
845        match std::fs::remove_dir_all(&dir) {
846            Ok(_) => Ok(()),
847            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
848            Err(e) => Err(e.into()),
849        }
850    }
851
852    fn put_object(
853        &self,
854        bucket: &str,
855        key: &str,
856        version: Option<&str>,
857        body: BodySource,
858        meta: &ObjectMeta,
859    ) -> StoreResult<BodyRef> {
860        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
861        std::fs::create_dir_all(&dir)?;
862
863        if meta.is_delete_marker {
864            crate::atomic::write_atomic_toml(&toml_path, meta)?;
865            return Ok(BodyRef::Memory(Bytes::new()));
866        }
867
868        let size: u64;
869        let bytes_for_cache: Option<Bytes>;
870        match body {
871            BodySource::Bytes(b) => {
872                size = b.len() as u64;
873                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
874                bytes_for_cache = Some(b);
875            }
876            BodySource::File(src) => {
877                let src_size = std::fs::metadata(&src)?.len();
878                size = src_size;
879                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
880                bytes_for_cache = None;
881            }
882            BodySource::FileCopy(src) => {
883                let src_size = std::fs::metadata(&src)?.len();
884                size = src_size;
885                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
886                bytes_for_cache = None;
887            }
888        }
889
890        crate::atomic::write_atomic_toml(&toml_path, meta)?;
891
892        let body_key = crate::cache::BodyKey::new(
893            bucket.to_string(),
894            key.to_string(),
895            version.map(|s| s.to_string()),
896        );
897        if let Some(b) = bytes_for_cache {
898            self.cache.insert(body_key, b);
899        } else {
900            self.cache.invalidate(&crate::cache::BodyKey::new(
901                bucket.to_string(),
902                key.to_string(),
903                version.map(|s| s.to_string()),
904            ));
905        }
906
907        Ok(BodyRef::Disk {
908            bucket: bucket.to_string(),
909            key: key.to_string(),
910            version: version.map(|s| s.to_string()),
911            path: bin_path,
912            size,
913        })
914    }
915
916    fn put_object_meta(
917        &self,
918        bucket: &str,
919        key: &str,
920        version: Option<&str>,
921        meta: &ObjectMeta,
922    ) -> StoreResult<()> {
923        let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
924        std::fs::create_dir_all(&dir)?;
925        crate::atomic::write_atomic_toml(&toml_path, meta)?;
926        Ok(())
927    }
928
929    fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
930        let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
931        for p in [&bin_path, &toml_path] {
932            match std::fs::remove_file(p) {
933                Ok(_) => {}
934                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
935                Err(e) => return Err(e.into()),
936            }
937        }
938        Self::cleanup_empty(&dir);
939
940        self.cache.invalidate(&crate::cache::BodyKey::new(
941            bucket.to_string(),
942            key.to_string(),
943            version.map(|s| s.to_string()),
944        ));
945        Ok(())
946    }
947
948    fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
949        match body {
950            BodyRef::Memory(b) => Ok(b.clone()),
951            BodyRef::Disk {
952                bucket,
953                key,
954                version,
955                path,
956                size: _,
957            } => {
958                let body_key =
959                    crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
960                if let Some(bytes) = self.cache.get(&body_key) {
961                    return Ok(bytes);
962                }
963                let bytes = Bytes::from(std::fs::read(path)?);
964                self.cache.insert(body_key, bytes.clone());
965                Ok(bytes)
966            }
967        }
968    }
969
970    fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
971        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
972        std::fs::create_dir_all(&parts_dir)?;
973        let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
974        crate::atomic::write_atomic_toml(&init_path, init)?;
975        Ok(())
976    }
977
978    fn mpu_put_part(
979        &self,
980        bucket: &str,
981        upload_id: &str,
982        part_number: u32,
983        body: BodySource,
984        etag: &str,
985    ) -> StoreResult<BodyRef> {
986        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
987        std::fs::create_dir_all(&parts_dir)?;
988        let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
989        let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
990
991        let size: u64 = match body {
992            BodySource::Bytes(b) => {
993                let n = b.len() as u64;
994                crate::atomic::write_atomic_bytes(&bin_path, &b)?;
995                let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
996                self.cache.insert(cache_key, b);
997                n
998            }
999            BodySource::File(src) => {
1000                let n = std::fs::metadata(&src)?.len();
1001                crate::atomic::write_atomic_from_file(&src, &bin_path)?;
1002                self.cache
1003                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1004                n
1005            }
1006            BodySource::FileCopy(src) => {
1007                let n = std::fs::metadata(&src)?.len();
1008                crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
1009                self.cache
1010                    .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1011                n
1012            }
1013        };
1014
1015        let meta = UploadPartMeta {
1016            part_number,
1017            etag: etag.to_string(),
1018            size,
1019            last_modified: Utc::now(),
1020        };
1021        crate::atomic::write_atomic_toml(&toml_path, &meta)?;
1022        // Disk-mode parts live on disk; expose them as BodyRef::Disk so
1023        // the runtime state never holds the part body in RAM.
1024        Ok(BodyRef::Disk {
1025            bucket: bucket.to_string(),
1026            key: format!("__mpu__/{upload_id}/part-{part_number:05}"),
1027            version: None,
1028            path: bin_path,
1029            size,
1030        })
1031    }
1032
1033    fn spool_dir(&self) -> Option<std::path::PathBuf> {
1034        let dir = self.root.join(".spool");
1035        // Best-effort: create lazily; the spool helper also creates if
1036        // missing. Returning the path even when create fails is safe —
1037        // the helper handles both create and the actual file open.
1038        let _ = std::fs::create_dir_all(&dir);
1039        Some(dir)
1040    }
1041
1042    fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
1043        let dir = self.mpu_dir(bucket, upload_id);
1044        // Invalidate any cached part bodies for this upload.
1045        if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
1046            for entry in entries.flatten() {
1047                let path = entry.path();
1048                if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
1049                    if let Some(stem) = fname.strip_suffix(".bin") {
1050                        if let Ok(n) = stem.parse::<u32>() {
1051                            self.cache
1052                                .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1053                        }
1054                    }
1055                }
1056            }
1057        }
1058        match std::fs::remove_dir_all(&dir) {
1059            Ok(_) => Ok(()),
1060            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1061            Err(e) => Err(e.into()),
1062        }
1063    }
1064
1065    fn mpu_complete(
1066        &self,
1067        bucket: &str,
1068        upload_id: &str,
1069        final_key: &str,
1070        version: Option<&str>,
1071        meta: &ObjectMeta,
1072    ) -> StoreResult<BodyRef> {
1073        let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1074        if !parts_dir.exists() {
1075            return Err(io_other(format!(
1076                "mpu_complete: no parts dir for upload {}",
1077                upload_id
1078            )));
1079        }
1080
1081        // Enumerate parts in ascending part-number order.
1082        let mut part_numbers: Vec<u32> = Vec::new();
1083        for entry in std::fs::read_dir(&parts_dir)? {
1084            let entry = entry?;
1085            let path = entry.path();
1086            let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1087                continue;
1088            };
1089            if let Some(stem) = fname.strip_suffix(".toml") {
1090                if let Ok(n) = stem.parse::<u32>() {
1091                    if self.mpu_part_bin(bucket, upload_id, n).exists() {
1092                        part_numbers.push(n);
1093                    }
1094                }
1095            }
1096        }
1097        part_numbers.sort_unstable();
1098        if part_numbers.is_empty() {
1099            return Err(io_other(format!(
1100                "mpu_complete: upload {} has no parts",
1101                upload_id
1102            )));
1103        }
1104
1105        let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1106        std::fs::create_dir_all(&dir)?;
1107
1108        let total_size: u64 = if part_numbers.len() == 1 {
1109            let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1110            let sz = std::fs::metadata(&only)?.len();
1111            match std::fs::rename(&only, &bin_path) {
1112                Ok(_) => {}
1113                Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1114                    // Cross-device rename: fall back to streaming copy then remove source.
1115                    {
1116                        let mut input = std::fs::File::open(&only)?;
1117                        let tmp = {
1118                            let mut os = bin_path.as_os_str().to_owned();
1119                            os.push(".tmp");
1120                            PathBuf::from(os)
1121                        };
1122                        let mut out = std::fs::OpenOptions::new()
1123                            .write(true)
1124                            .create(true)
1125                            .truncate(true)
1126                            .open(&tmp)?;
1127                        std::io::copy(&mut input, &mut out)?;
1128                        out.sync_all()?;
1129                        std::fs::rename(&tmp, &bin_path)?;
1130                    }
1131                    let _ = std::fs::remove_file(&only);
1132                }
1133                Err(e) => return Err(e.into()),
1134            }
1135            sz
1136        } else {
1137            let tmp = {
1138                let mut os = bin_path.as_os_str().to_owned();
1139                os.push(".tmp");
1140                PathBuf::from(os)
1141            };
1142            let mut total: u64 = 0;
1143            {
1144                let mut out = std::fs::OpenOptions::new()
1145                    .write(true)
1146                    .create(true)
1147                    .truncate(true)
1148                    .open(&tmp)?;
1149                for n in &part_numbers {
1150                    let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1151                    let mut input = std::fs::File::open(&part_path)?;
1152                    let copied = std::io::copy(&mut input, &mut out)?;
1153                    total += copied;
1154                }
1155                out.sync_all()?;
1156            }
1157            std::fs::rename(&tmp, &bin_path)?;
1158            if let Some(parent) = bin_path.parent() {
1159                if let Ok(dir_handle) = std::fs::File::open(parent) {
1160                    let _ = dir_handle.sync_all();
1161                }
1162            }
1163            total
1164        };
1165
1166        crate::atomic::write_atomic_toml(&toml_path, meta)?;
1167
1168        // Invalidate per-part cache entries and drop the mpu dir.
1169        for n in &part_numbers {
1170            self.cache
1171                .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1172        }
1173        let mpu_dir = self.mpu_dir(bucket, upload_id);
1174        let _ = std::fs::remove_dir_all(&mpu_dir);
1175
1176        // The concatenated body is deliberately NOT re-inserted into the cache —
1177        // large multipart uploads typically exceed the single-object cap, and we
1178        // must never round-trip the assembled body through RAM. The next
1179        // open_object_body call will load through the normal cache path.
1180        self.cache.invalidate(&crate::cache::BodyKey::new(
1181            bucket.to_string(),
1182            final_key.to_string(),
1183            version.map(|s| s.to_string()),
1184        ));
1185
1186        Ok(BodyRef::Disk {
1187            bucket: bucket.to_string(),
1188            key: final_key.to_string(),
1189            version: version.map(|s| s.to_string()),
1190            path: bin_path,
1191            size: total_size,
1192        })
1193    }
1194}
1195
1196fn libc_exdev() -> i32 {
1197    18
1198}
1199
1200#[cfg(test)]
1201mod disk_tests {
1202    use super::*;
1203    use std::sync::Arc;
1204    use tempfile::TempDir;
1205
1206    fn new_store(tmp: &TempDir) -> DiskS3Store {
1207        let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1208        DiskS3Store::new(tmp.path().to_path_buf(), cache)
1209    }
1210
1211    fn new_store_with_cache(
1212        tmp: &TempDir,
1213        cap: u64,
1214    ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1215        let cache = Arc::new(crate::cache::BodyCache::new(cap));
1216        (
1217            DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1218            cache,
1219        )
1220    }
1221
1222    fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1223        ObjectMeta {
1224            key: key.to_string(),
1225            content_type: "application/octet-stream".to_string(),
1226            etag: "etag".to_string(),
1227            size,
1228            ..Default::default()
1229        }
1230    }
1231
1232    #[test]
1233    fn put_bucket_meta_roundtrip() {
1234        let tmp = TempDir::new().unwrap();
1235        let store = new_store(&tmp);
1236        let meta = BucketMeta {
1237            name: "b1".to_string(),
1238            region: "us-east-1".to_string(),
1239            versioning: Some("Enabled".to_string()),
1240            ..Default::default()
1241        };
1242        store.put_bucket_meta("b1", &meta).unwrap();
1243        let loaded = store.load().unwrap();
1244        let snap = loaded.buckets.get("b1").unwrap();
1245        assert_eq!(snap.meta.name, "b1");
1246        assert_eq!(snap.meta.region, "us-east-1");
1247        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1248    }
1249
1250    #[test]
1251    fn put_bucket_subresource_each_variant_writes_file() {
1252        let tmp = TempDir::new().unwrap();
1253        let store = new_store(&tmp);
1254        store
1255            .put_bucket_meta(
1256                "b",
1257                &BucketMeta {
1258                    name: "b".to_string(),
1259                    ..Default::default()
1260                },
1261            )
1262            .unwrap();
1263        let variants = [
1264            BucketSubresource::Tags,
1265            BucketSubresource::Lifecycle,
1266            BucketSubresource::Cors,
1267            BucketSubresource::Policy,
1268            BucketSubresource::Notification,
1269            BucketSubresource::Logging,
1270            BucketSubresource::Website,
1271            BucketSubresource::PublicAccessBlock,
1272            BucketSubresource::ObjectLock,
1273            BucketSubresource::Replication,
1274            BucketSubresource::Ownership,
1275            BucketSubresource::Inventory,
1276            BucketSubresource::Encryption,
1277            BucketSubresource::Versioning,
1278            BucketSubresource::Acl,
1279            BucketSubresource::Accelerate,
1280        ];
1281        for v in variants {
1282            store
1283                .put_bucket_subresource("b", v, "payload=true")
1284                .unwrap();
1285            let file = store
1286                .bucket_dir("b")
1287                .join(DiskS3Store::subresource_filename(v));
1288            assert!(file.exists(), "{:?}", v);
1289            assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1290        }
1291    }
1292
1293    #[test]
1294    fn delete_bucket_subresource_removes_file() {
1295        let tmp = TempDir::new().unwrap();
1296        let store = new_store(&tmp);
1297        store
1298            .put_bucket_meta(
1299                "b",
1300                &BucketMeta {
1301                    name: "b".to_string(),
1302                    ..Default::default()
1303                },
1304            )
1305            .unwrap();
1306        store
1307            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1308            .unwrap();
1309        store
1310            .delete_bucket_subresource("b", BucketSubresource::Tags)
1311            .unwrap();
1312        let file = store.bucket_dir("b").join("tags.toml");
1313        assert!(!file.exists());
1314        // idempotent
1315        store
1316            .delete_bucket_subresource("b", BucketSubresource::Tags)
1317            .unwrap();
1318    }
1319
1320    #[test]
1321    fn put_object_bytes_roundtrip() {
1322        let tmp = TempDir::new().unwrap();
1323        let store = new_store(&tmp);
1324        store
1325            .put_bucket_meta(
1326                "b",
1327                &BucketMeta {
1328                    name: "b".to_string(),
1329                    ..Default::default()
1330                },
1331            )
1332            .unwrap();
1333        let data = Bytes::from_static(b"hello world");
1334        let meta = sample_meta("k1", data.len() as u64);
1335        let body_ref = store
1336            .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1337            .unwrap();
1338        match &body_ref {
1339            BodyRef::Disk {
1340                bucket,
1341                key,
1342                size,
1343                path,
1344                ..
1345            } => {
1346                assert_eq!(bucket, "b");
1347                assert_eq!(key, "k1");
1348                assert_eq!(*size, data.len() as u64);
1349                assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1350            }
1351            _ => panic!("expected Disk"),
1352        }
1353        let loaded = store.load().unwrap();
1354        let snap = loaded.buckets.get("b").unwrap();
1355        let obj = snap.objects.get("k1").unwrap();
1356        assert_eq!(obj.meta.size, data.len() as u64);
1357    }
1358
1359    #[test]
1360    fn put_object_file_copy_source_leaves_src_in_place() {
1361        let tmp = TempDir::new().unwrap();
1362        let store = new_store(&tmp);
1363        store
1364            .put_bucket_meta(
1365                "b",
1366                &BucketMeta {
1367                    name: "b".to_string(),
1368                    ..Default::default()
1369                },
1370            )
1371            .unwrap();
1372        let src_dir = TempDir::new().unwrap();
1373        let src = src_dir.path().join("src.bin");
1374        std::fs::write(&src, b"copied-body").unwrap();
1375        let meta = sample_meta("k", 11);
1376        let bref = store
1377            .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1378            .unwrap();
1379        match bref {
1380            BodyRef::Disk { path, size, .. } => {
1381                assert_eq!(size, 11);
1382                assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1383            }
1384            _ => panic!("expected Disk bodyref"),
1385        }
1386        assert!(src.exists(), "source file must not be moved by FileCopy");
1387        assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1388    }
1389
1390    #[test]
1391    fn put_object_file_source() {
1392        let tmp = TempDir::new().unwrap();
1393        let store = new_store(&tmp);
1394        store
1395            .put_bucket_meta(
1396                "b",
1397                &BucketMeta {
1398                    name: "b".to_string(),
1399                    ..Default::default()
1400                },
1401            )
1402            .unwrap();
1403        let src = tmp.path().join("src.bin");
1404        std::fs::write(&src, b"file-body").unwrap();
1405        let meta = sample_meta("k", 9);
1406        let body_ref = store
1407            .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1408            .unwrap();
1409        let path = match body_ref {
1410            BodyRef::Disk { path, .. } => path,
1411            _ => panic!(),
1412        };
1413        assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1414    }
1415
1416    #[test]
1417    fn put_object_meta_only_keeps_bin() {
1418        let tmp = TempDir::new().unwrap();
1419        let store = new_store(&tmp);
1420        store
1421            .put_bucket_meta(
1422                "b",
1423                &BucketMeta {
1424                    name: "b".to_string(),
1425                    ..Default::default()
1426                },
1427            )
1428            .unwrap();
1429        let data = Bytes::from_static(b"abc");
1430        let mut meta = sample_meta("k", 3);
1431        store
1432            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1433            .unwrap();
1434        let (_, bin, _) = store.object_paths("b", "k", None);
1435        let before = std::fs::read(&bin).unwrap();
1436        meta.tags.insert("x".to_string(), "y".to_string());
1437        store.put_object_meta("b", "k", None, &meta).unwrap();
1438        assert_eq!(std::fs::read(&bin).unwrap(), before);
1439        let loaded = store.load().unwrap();
1440        let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1441        assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1442    }
1443
1444    #[test]
1445    fn delete_object_cleans_up_files_and_cache() {
1446        let tmp = TempDir::new().unwrap();
1447        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1448        store
1449            .put_bucket_meta(
1450                "b",
1451                &BucketMeta {
1452                    name: "b".to_string(),
1453                    ..Default::default()
1454                },
1455            )
1456            .unwrap();
1457        let data = Bytes::from_static(b"bye");
1458        let meta = sample_meta("k", 3);
1459        store
1460            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1461            .unwrap();
1462        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1463        assert!(cache.get(&body_key).is_some());
1464        store.delete_object("b", "k", None).unwrap();
1465        let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1466        assert!(!bin.exists());
1467        assert!(!toml_path.exists());
1468        assert!(!dir.exists());
1469        assert!(cache.get(&body_key).is_none());
1470    }
1471
1472    #[test]
1473    fn open_object_body_cache_hit_and_refill() {
1474        let tmp = TempDir::new().unwrap();
1475        let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1476        store
1477            .put_bucket_meta(
1478                "b",
1479                &BucketMeta {
1480                    name: "b".to_string(),
1481                    ..Default::default()
1482                },
1483            )
1484            .unwrap();
1485        let data = Bytes::from_static(b"payload");
1486        let meta = sample_meta("k", data.len() as u64);
1487        let body_ref = store
1488            .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1489            .unwrap();
1490        // Cache hit.
1491        let got = store.open_object_body(&body_ref).unwrap();
1492        assert_eq!(got, data);
1493        // Invalidate and re-read populates cache from disk.
1494        let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1495        cache.invalidate(&body_key);
1496        assert!(cache.get(&body_key).is_none());
1497        let got = store.open_object_body(&body_ref).unwrap();
1498        assert_eq!(got, data);
1499        assert!(cache.get(&body_key).is_some());
1500    }
1501
1502    #[test]
1503    fn open_object_body_large_bypasses_cache() {
1504        let tmp = TempDir::new().unwrap();
1505        // capacity 1024 → single-object cap 512. Use 800-byte body.
1506        let (store, cache) = new_store_with_cache(&tmp, 1024);
1507        store
1508            .put_bucket_meta(
1509                "b",
1510                &BucketMeta {
1511                    name: "b".to_string(),
1512                    ..Default::default()
1513                },
1514            )
1515            .unwrap();
1516        let data = Bytes::from(vec![7u8; 800]);
1517        let meta = sample_meta("big", 800);
1518        let body_ref = store
1519            .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1520            .unwrap();
1521        let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1522        assert!(cache.get(&body_key).is_none());
1523        let got = store.open_object_body(&body_ref).unwrap();
1524        assert_eq!(got, data);
1525        // Still none — exceeds single-object cap.
1526        assert!(cache.get(&body_key).is_none());
1527    }
1528
1529    #[test]
1530    fn load_empty_dir() {
1531        let tmp = TempDir::new().unwrap();
1532        let store = new_store(&tmp);
1533        let s = store.load().unwrap();
1534        assert!(s.buckets.is_empty());
1535    }
1536
1537    #[test]
1538    fn load_skips_mpu_without_init() {
1539        let tmp = TempDir::new().unwrap();
1540        let store = new_store(&tmp);
1541        store
1542            .put_bucket_meta(
1543                "b",
1544                &BucketMeta {
1545                    name: "b".to_string(),
1546                    ..Default::default()
1547                },
1548            )
1549            .unwrap();
1550        let data = Bytes::from_static(b"x");
1551        let meta = sample_meta("k", 1);
1552        store
1553            .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1554            .unwrap();
1555        // Directory with no init.toml is skipped by load.
1556        let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1557        std::fs::create_dir_all(&mpu).unwrap();
1558
1559        let loaded = store.load().unwrap();
1560        let snap = loaded.buckets.get("b").unwrap();
1561        assert_eq!(snap.objects.len(), 1);
1562        assert!(snap.multipart_uploads.is_empty());
1563    }
1564
1565    #[test]
1566    fn load_reads_bucket_subresources() {
1567        let tmp = TempDir::new().unwrap();
1568        let store = new_store(&tmp);
1569        store
1570            .put_bucket_meta(
1571                "b",
1572                &BucketMeta {
1573                    name: "b".to_string(),
1574                    ..Default::default()
1575                },
1576            )
1577            .unwrap();
1578        store
1579            .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1580            .unwrap();
1581        store
1582            .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1583            .unwrap();
1584        store
1585            .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1586            .unwrap();
1587        store
1588            .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1589            .unwrap();
1590        let loaded = store.load().unwrap();
1591        let snap = loaded.buckets.get("b").unwrap();
1592        assert_eq!(
1593            snap.subresources.get("lifecycle.toml").map(String::as_str),
1594            Some("<Lifecycle/>"),
1595        );
1596        assert_eq!(
1597            snap.subresources.get("cors.toml").map(String::as_str),
1598            Some("<Cors/>"),
1599        );
1600        assert_eq!(
1601            snap.subresources.get("policy.toml").map(String::as_str),
1602            Some("{}"),
1603        );
1604        assert_eq!(
1605            snap.subresources.get("tags.toml").map(String::as_str),
1606            Some("x=1"),
1607        );
1608    }
1609
1610    #[test]
1611    fn versioned_put_load_roundtrip() {
1612        let tmp = TempDir::new().unwrap();
1613        let store = new_store(&tmp);
1614        store
1615            .put_bucket_meta(
1616                "b",
1617                &BucketMeta {
1618                    name: "b".to_string(),
1619                    versioning: Some("Enabled".to_string()),
1620                    ..Default::default()
1621                },
1622            )
1623            .unwrap();
1624        let base = chrono::Utc::now();
1625        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1626            .iter()
1627            .enumerate()
1628        {
1629            let mut m = sample_meta("k", body.len() as u64);
1630            m.version_id = Some((*vid).to_string());
1631            m.last_modified = base + chrono::Duration::seconds(i as i64);
1632            store
1633                .put_object(
1634                    "b",
1635                    "k",
1636                    Some(*vid),
1637                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1638                    &m,
1639                )
1640                .unwrap();
1641        }
1642        let loaded = store.load().unwrap();
1643        let snap = loaded.buckets.get("b").unwrap();
1644        let versions = snap.object_versions.get("k").expect("versions present");
1645        assert_eq!(versions.len(), 3);
1646        assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1647        assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1648        assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1649        for v in versions {
1650            match &v.body {
1651                BodyRef::Disk { size, .. } => assert!(*size > 0),
1652                _ => panic!("expected Disk body"),
1653            }
1654        }
1655    }
1656
1657    #[test]
1658    fn versioned_load_promotes_latest_live_to_snap_objects() {
1659        let tmp = TempDir::new().unwrap();
1660        let store = new_store(&tmp);
1661        store
1662            .put_bucket_meta(
1663                "b",
1664                &BucketMeta {
1665                    name: "b".to_string(),
1666                    versioning: Some("Enabled".to_string()),
1667                    ..Default::default()
1668                },
1669            )
1670            .unwrap();
1671        let base = chrono::Utc::now();
1672        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1673            .iter()
1674            .enumerate()
1675        {
1676            let mut m = sample_meta("k", body.len() as u64);
1677            m.version_id = Some((*vid).to_string());
1678            m.last_modified = base + chrono::Duration::seconds(i as i64);
1679            store
1680                .put_object(
1681                    "b",
1682                    "k",
1683                    Some(*vid),
1684                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1685                    &m,
1686                )
1687                .unwrap();
1688        }
1689        let loaded = store.load().unwrap();
1690        let snap = loaded.buckets.get("b").unwrap();
1691        let current = snap.objects.get("k").expect("current object promoted");
1692        assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1693    }
1694
1695    #[test]
1696    fn versioned_load_trailing_delete_marker_hides_current() {
1697        let tmp = TempDir::new().unwrap();
1698        let store = new_store(&tmp);
1699        store
1700            .put_bucket_meta(
1701                "b",
1702                &BucketMeta {
1703                    name: "b".to_string(),
1704                    versioning: Some("Enabled".to_string()),
1705                    ..Default::default()
1706                },
1707            )
1708            .unwrap();
1709        let base = chrono::Utc::now();
1710        for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1711            .iter()
1712            .enumerate()
1713        {
1714            let mut m = sample_meta("k", body.len() as u64);
1715            m.version_id = Some((*vid).to_string());
1716            m.last_modified = base + chrono::Duration::seconds(i as i64);
1717            store
1718                .put_object(
1719                    "b",
1720                    "k",
1721                    Some(*vid),
1722                    BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1723                    &m,
1724                )
1725                .unwrap();
1726        }
1727        // Append a delete marker on top.
1728        let mut dm = sample_meta("k", 0);
1729        dm.version_id = Some("dm1".to_string());
1730        dm.is_delete_marker = true;
1731        dm.last_modified = base + chrono::Duration::seconds(10);
1732        store
1733            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1734            .unwrap();
1735        let loaded = store.load().unwrap();
1736        let snap = loaded.buckets.get("b").unwrap();
1737        assert!(
1738            !snap.objects.contains_key("k"),
1739            "trailing delete marker must hide current object",
1740        );
1741        assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1742    }
1743
1744    #[test]
1745    fn legacy_null_object_overridden_by_newer_versions() {
1746        // A pre-versioning null put followed by versioning-enabled puts must
1747        // see snap.objects reflect the newest live version, not the stale null.
1748        let tmp = TempDir::new().unwrap();
1749        let store = new_store(&tmp);
1750        store
1751            .put_bucket_meta(
1752                "b",
1753                &BucketMeta {
1754                    name: "b".to_string(),
1755                    ..Default::default()
1756                },
1757            )
1758            .unwrap();
1759        let base = chrono::Utc::now();
1760        let mut null_meta = sample_meta("k", 3);
1761        null_meta.last_modified = base;
1762        store
1763            .put_object(
1764                "b",
1765                "k",
1766                None,
1767                BodySource::Bytes(Bytes::from_static(b"old")),
1768                &null_meta,
1769            )
1770            .unwrap();
1771        // Enable versioning and put two versions, the second a delete.
1772        store
1773            .put_bucket_meta(
1774                "b",
1775                &BucketMeta {
1776                    name: "b".to_string(),
1777                    versioning: Some("Enabled".to_string()),
1778                    ..Default::default()
1779                },
1780            )
1781            .unwrap();
1782        let mut v1 = sample_meta("k", 3);
1783        v1.version_id = Some("v1".to_string());
1784        v1.last_modified = base + chrono::Duration::seconds(1);
1785        store
1786            .put_object(
1787                "b",
1788                "k",
1789                Some("v1"),
1790                BodySource::Bytes(Bytes::from_static(b"new")),
1791                &v1,
1792            )
1793            .unwrap();
1794        let mut v2 = sample_meta("k", 5);
1795        v2.version_id = Some("v2".to_string());
1796        v2.last_modified = base + chrono::Duration::seconds(2);
1797        store
1798            .put_object(
1799                "b",
1800                "k",
1801                Some("v2"),
1802                BodySource::Bytes(Bytes::from_static(b"newer")),
1803                &v2,
1804            )
1805            .unwrap();
1806        let loaded = store.load().unwrap();
1807        let snap = loaded.buckets.get("b").unwrap();
1808        let current = snap
1809            .objects
1810            .get("k")
1811            .expect("latest live version must override stale null");
1812        assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1813        assert_eq!(current.meta.size, 5);
1814    }
1815
1816    #[test]
1817    fn legacy_null_hidden_by_trailing_delete_marker() {
1818        let tmp = TempDir::new().unwrap();
1819        let store = new_store(&tmp);
1820        store
1821            .put_bucket_meta(
1822                "b",
1823                &BucketMeta {
1824                    name: "b".to_string(),
1825                    ..Default::default()
1826                },
1827            )
1828            .unwrap();
1829        let base = chrono::Utc::now();
1830        let mut null_meta = sample_meta("k", 3);
1831        null_meta.last_modified = base;
1832        store
1833            .put_object(
1834                "b",
1835                "k",
1836                None,
1837                BodySource::Bytes(Bytes::from_static(b"old")),
1838                &null_meta,
1839            )
1840            .unwrap();
1841        store
1842            .put_bucket_meta(
1843                "b",
1844                &BucketMeta {
1845                    name: "b".to_string(),
1846                    versioning: Some("Enabled".to_string()),
1847                    ..Default::default()
1848                },
1849            )
1850            .unwrap();
1851        let mut v1 = sample_meta("k", 3);
1852        v1.version_id = Some("v1".to_string());
1853        v1.last_modified = base + chrono::Duration::seconds(1);
1854        store
1855            .put_object(
1856                "b",
1857                "k",
1858                Some("v1"),
1859                BodySource::Bytes(Bytes::from_static(b"new")),
1860                &v1,
1861            )
1862            .unwrap();
1863        let mut dm = sample_meta("k", 0);
1864        dm.version_id = Some("dm1".to_string());
1865        dm.is_delete_marker = true;
1866        dm.last_modified = base + chrono::Duration::seconds(2);
1867        store
1868            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1869            .unwrap();
1870        let loaded = store.load().unwrap();
1871        let snap = loaded.buckets.get("b").unwrap();
1872        assert!(
1873            !snap.objects.contains_key("k"),
1874            "trailing delete marker must hide even a legacy null object",
1875        );
1876    }
1877
1878    #[test]
1879    fn delete_marker_roundtrip_no_body_file() {
1880        let tmp = TempDir::new().unwrap();
1881        let store = new_store(&tmp);
1882        store
1883            .put_bucket_meta(
1884                "b",
1885                &BucketMeta {
1886                    name: "b".to_string(),
1887                    versioning: Some("Enabled".to_string()),
1888                    ..Default::default()
1889                },
1890            )
1891            .unwrap();
1892        let mut m = sample_meta("k", 0);
1893        m.version_id = Some("dm1".to_string());
1894        m.is_delete_marker = true;
1895        store
1896            .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1897            .unwrap();
1898        // No .bin file on disk for delete markers.
1899        let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1900        assert!(!bin.exists(), "delete marker must not have a .bin file");
1901        assert!(toml_path.exists());
1902
1903        let loaded = store.load().unwrap();
1904        let versions = loaded
1905            .buckets
1906            .get("b")
1907            .unwrap()
1908            .object_versions
1909            .get("k")
1910            .unwrap();
1911        assert_eq!(versions.len(), 1);
1912        assert!(versions[0].meta.is_delete_marker);
1913        match &versions[0].body {
1914            BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1915            _ => panic!("delete marker body should be empty Memory"),
1916        }
1917    }
1918
1919    #[test]
1920    fn mixed_nonversioned_and_versioned_buckets() {
1921        let tmp = TempDir::new().unwrap();
1922        let store = new_store(&tmp);
1923        store
1924            .put_bucket_meta(
1925                "a",
1926                &BucketMeta {
1927                    name: "a".to_string(),
1928                    ..Default::default()
1929                },
1930            )
1931            .unwrap();
1932        store
1933            .put_bucket_meta(
1934                "b",
1935                &BucketMeta {
1936                    name: "b".to_string(),
1937                    versioning: Some("Enabled".to_string()),
1938                    ..Default::default()
1939                },
1940            )
1941            .unwrap();
1942        let ma = sample_meta("only", 3);
1943        store
1944            .put_object(
1945                "a",
1946                "only",
1947                None,
1948                BodySource::Bytes(Bytes::from_static(b"aaa")),
1949                &ma,
1950            )
1951            .unwrap();
1952        let base = chrono::Utc::now();
1953        for (i, vid) in ["v1", "v2"].iter().enumerate() {
1954            let mut m = sample_meta("twice", 2);
1955            m.version_id = Some((*vid).to_string());
1956            m.last_modified = base + chrono::Duration::seconds(i as i64);
1957            store
1958                .put_object(
1959                    "b",
1960                    "twice",
1961                    Some(*vid),
1962                    BodySource::Bytes(Bytes::from_static(b"xx")),
1963                    &m,
1964                )
1965                .unwrap();
1966        }
1967        let loaded = store.load().unwrap();
1968        assert_eq!(loaded.buckets.len(), 2);
1969        let a = loaded.buckets.get("a").unwrap();
1970        assert_eq!(a.objects.len(), 1);
1971        assert!(a.object_versions.is_empty());
1972        let b = loaded.buckets.get("b").unwrap();
1973        // Fix #5: the latest live version is promoted into objects so
1974        // unversioned GETs see it post-restart.
1975        assert_eq!(
1976            b.objects.get("twice").unwrap().meta.version_id.as_deref(),
1977            Some("v2")
1978        );
1979        assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
1980    }
1981
1982    fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
1983        MpuInit {
1984            upload_id: upload_id.to_string(),
1985            key: key.to_string(),
1986            initiated: chrono::Utc::now(),
1987            content_type: "application/octet-stream".to_string(),
1988            storage_class: "STANDARD".to_string(),
1989            ..Default::default()
1990        }
1991    }
1992
1993    #[test]
1994    fn mpu_create_then_load_empty_parts() {
1995        let tmp = TempDir::new().unwrap();
1996        let store = new_store(&tmp);
1997        store
1998            .put_bucket_meta(
1999                "b",
2000                &BucketMeta {
2001                    name: "b".to_string(),
2002                    ..Default::default()
2003                },
2004            )
2005            .unwrap();
2006        let init = sample_mpu_init("up1", "k1");
2007        store.mpu_create("b", "up1", &init).unwrap();
2008        let loaded = store.load().unwrap();
2009        let snap = loaded.buckets.get("b").unwrap();
2010        let m = snap.multipart_uploads.get("up1").expect("upload present");
2011        assert_eq!(m.init.upload_id, "up1");
2012        assert_eq!(m.init.key, "k1");
2013        assert!(m.parts.is_empty());
2014    }
2015
2016    #[test]
2017    fn mpu_put_part_then_load_three_parts() {
2018        let tmp = TempDir::new().unwrap();
2019        let store = new_store(&tmp);
2020        store
2021            .put_bucket_meta(
2022                "b",
2023                &BucketMeta {
2024                    name: "b".to_string(),
2025                    ..Default::default()
2026                },
2027            )
2028            .unwrap();
2029        store
2030            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2031            .unwrap();
2032        let bodies = [
2033            Bytes::from_static(b"part-one"),
2034            Bytes::from_static(b"part-two-longer"),
2035            Bytes::from_static(b"p3"),
2036        ];
2037        for (i, body) in bodies.iter().enumerate() {
2038            let n = (i + 1) as u32;
2039            store
2040                .mpu_put_part(
2041                    "b",
2042                    "up",
2043                    n,
2044                    BodySource::Bytes(body.clone()),
2045                    &format!("et{}", n),
2046                )
2047                .unwrap();
2048        }
2049        let loaded = store.load().unwrap();
2050        let snap = loaded.buckets.get("b").unwrap();
2051        let m = snap.multipart_uploads.get("up").unwrap();
2052        assert_eq!(m.parts.len(), 3);
2053        for (i, body) in bodies.iter().enumerate() {
2054            let n = (i + 1) as u32;
2055            let part = m.parts.get(&n).unwrap();
2056            assert_eq!(part.meta.part_number, n);
2057            assert_eq!(part.meta.size, body.len() as u64);
2058            assert_eq!(part.meta.etag, format!("et{}", n));
2059            match &part.body {
2060                BodyRef::Disk { path, size, .. } => {
2061                    assert_eq!(*size, body.len() as u64);
2062                    assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2063                }
2064                _ => panic!("expected Disk"),
2065            }
2066        }
2067    }
2068
2069    #[test]
2070    fn mpu_abort_removes_upload() {
2071        let tmp = TempDir::new().unwrap();
2072        let store = new_store(&tmp);
2073        store
2074            .put_bucket_meta(
2075                "b",
2076                &BucketMeta {
2077                    name: "b".to_string(),
2078                    ..Default::default()
2079                },
2080            )
2081            .unwrap();
2082        store
2083            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2084            .unwrap();
2085        store
2086            .mpu_put_part(
2087                "b",
2088                "up",
2089                1,
2090                BodySource::Bytes(Bytes::from_static(b"x")),
2091                "e",
2092            )
2093            .unwrap();
2094        store.mpu_abort("b", "up").unwrap();
2095        assert!(!store.mpu_dir("b", "up").exists());
2096        // Idempotent.
2097        store.mpu_abort("b", "up").unwrap();
2098        let loaded = store.load().unwrap();
2099        let snap = loaded.buckets.get("b").unwrap();
2100        assert!(snap.multipart_uploads.is_empty());
2101    }
2102
2103    #[test]
2104    fn mpu_complete_single_part_fast_path() {
2105        let tmp = TempDir::new().unwrap();
2106        let store = new_store(&tmp);
2107        store
2108            .put_bucket_meta(
2109                "b",
2110                &BucketMeta {
2111                    name: "b".to_string(),
2112                    ..Default::default()
2113                },
2114            )
2115            .unwrap();
2116        store
2117            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2118            .unwrap();
2119        let body = Bytes::from_static(b"only-part-bytes");
2120        store
2121            .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2122            .unwrap();
2123        let meta = sample_meta("k", body.len() as u64);
2124        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2125        match &body_ref {
2126            BodyRef::Disk { path, size, .. } => {
2127                assert_eq!(*size, body.len() as u64);
2128                assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2129            }
2130            _ => panic!("expected Disk"),
2131        }
2132        assert!(!store.mpu_dir("b", "up").exists());
2133    }
2134
2135    #[test]
2136    fn mpu_complete_multi_part_concat() {
2137        let tmp = TempDir::new().unwrap();
2138        let store = new_store(&tmp);
2139        store
2140            .put_bucket_meta(
2141                "b",
2142                &BucketMeta {
2143                    name: "b".to_string(),
2144                    ..Default::default()
2145                },
2146            )
2147            .unwrap();
2148        store
2149            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2150            .unwrap();
2151        let p1 = Bytes::from_static(b"AAAA");
2152        let p2 = Bytes::from_static(b"BBBBBB");
2153        let p3 = Bytes::from_static(b"CC");
2154        for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2155            store
2156                .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2157                .unwrap();
2158        }
2159        let mut expected = Vec::new();
2160        expected.extend_from_slice(&p1);
2161        expected.extend_from_slice(&p2);
2162        expected.extend_from_slice(&p3);
2163        let meta = sample_meta("k", expected.len() as u64);
2164        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2165        let path = match body_ref {
2166            BodyRef::Disk { path, size, .. } => {
2167                assert_eq!(size, expected.len() as u64);
2168                path
2169            }
2170            _ => panic!("expected Disk"),
2171        };
2172        assert_eq!(std::fs::read(&path).unwrap(), expected);
2173        assert!(!store.mpu_dir("b", "up").exists());
2174    }
2175
2176    #[test]
2177    fn mpu_complete_large_streaming_via_file_source() {
2178        let tmp = TempDir::new().unwrap();
2179        let store = new_store(&tmp);
2180        store
2181            .put_bucket_meta(
2182                "b",
2183                &BucketMeta {
2184                    name: "b".to_string(),
2185                    ..Default::default()
2186                },
2187            )
2188            .unwrap();
2189        store
2190            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2191            .unwrap();
2192
2193        // 3 parts of ~1 MiB each (kept small so tests stay fast but still
2194        // exercise the BodySource::File + streaming concat path).
2195        const PART_SIZE: usize = 1024 * 1024;
2196        let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2197        let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2198        for (i, byte) in patterns.iter().enumerate() {
2199            let src = tmp.path().join(format!("src-{}.bin", i + 1));
2200            let data = vec![*byte; PART_SIZE];
2201            std::fs::write(&src, &data).unwrap();
2202            expected.extend_from_slice(&data);
2203            store
2204                .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2205                .unwrap();
2206        }
2207        let meta = sample_meta("k", expected.len() as u64);
2208        let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2209        let path = match body_ref {
2210            BodyRef::Disk { path, size, .. } => {
2211                assert_eq!(size, expected.len() as u64);
2212                path
2213            }
2214            _ => panic!("expected Disk"),
2215        };
2216        let actual = std::fs::read(&path).unwrap();
2217        assert_eq!(actual.len(), expected.len());
2218        assert_eq!(actual, expected);
2219        assert!(!store.mpu_dir("b", "up").exists());
2220    }
2221
2222    #[test]
2223    fn mpu_resumable_across_load() {
2224        let tmp = TempDir::new().unwrap();
2225        let store = new_store(&tmp);
2226        store
2227            .put_bucket_meta(
2228                "b",
2229                &BucketMeta {
2230                    name: "b".to_string(),
2231                    ..Default::default()
2232                },
2233            )
2234            .unwrap();
2235        store
2236            .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2237            .unwrap();
2238        let p1 = Bytes::from_static(b"hello-");
2239        let p2 = Bytes::from_static(b"world-");
2240        store
2241            .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2242            .unwrap();
2243        store
2244            .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2245            .unwrap();
2246
2247        // Simulate a restart: re-open the store on the same dir and load.
2248        let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2249        let loaded = store2.load().unwrap();
2250        let snap = loaded.buckets.get("b").unwrap();
2251        let m = snap.multipart_uploads.get("up").unwrap();
2252        assert_eq!(m.parts.len(), 2);
2253        assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2254        assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2255
2256        // Continue the upload on the fresh store.
2257        let p3 = Bytes::from_static(b"again!");
2258        store2
2259            .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2260            .unwrap();
2261        let mut expected = Vec::new();
2262        expected.extend_from_slice(&p1);
2263        expected.extend_from_slice(&p2);
2264        expected.extend_from_slice(&p3);
2265        let meta = sample_meta("k", expected.len() as u64);
2266        let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2267        let path = match body_ref {
2268            BodyRef::Disk { path, .. } => path,
2269            _ => panic!(),
2270        };
2271        assert_eq!(std::fs::read(&path).unwrap(), expected);
2272        assert!(!store2.mpu_dir("b", "up").exists());
2273    }
2274
2275    #[test]
2276    fn tags_snapshot_roundtrip_via_store() {
2277        let tmp = TempDir::new().unwrap();
2278        let store = new_store(&tmp);
2279        store
2280            .put_bucket_meta(
2281                "b",
2282                &BucketMeta {
2283                    name: "b".to_string(),
2284                    ..Default::default()
2285                },
2286            )
2287            .unwrap();
2288        let mut tags = HashMap::new();
2289        tags.insert("env".to_string(), "prod".to_string());
2290        tags.insert("team".to_string(), "s3".to_string());
2291        let snap = TagsSnapshot { tags: tags.clone() };
2292        let payload = toml::to_string(&snap).unwrap();
2293        store
2294            .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2295            .unwrap();
2296        let loaded = store.load().unwrap();
2297        let text = loaded
2298            .buckets
2299            .get("b")
2300            .unwrap()
2301            .subresources
2302            .get("tags.toml")
2303            .cloned()
2304            .unwrap();
2305        let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2306        assert_eq!(decoded.tags, tags);
2307    }
2308
2309    #[test]
2310    fn acl_snapshot_roundtrip_via_store() {
2311        let tmp = TempDir::new().unwrap();
2312        let store = new_store(&tmp);
2313        store
2314            .put_bucket_meta(
2315                "b",
2316                &BucketMeta {
2317                    name: "b".to_string(),
2318                    ..Default::default()
2319                },
2320            )
2321            .unwrap();
2322        let snap = AclSnapshot {
2323            owner_id: "owner-abc".to_string(),
2324            grants: vec![
2325                AclGrantSnapshot {
2326                    grantee_type: "CanonicalUser".to_string(),
2327                    grantee_id: Some("owner-abc".to_string()),
2328                    grantee_display_name: Some("owner".to_string()),
2329                    grantee_uri: None,
2330                    permission: "FULL_CONTROL".to_string(),
2331                },
2332                AclGrantSnapshot {
2333                    grantee_type: "Group".to_string(),
2334                    grantee_id: None,
2335                    grantee_display_name: None,
2336                    grantee_uri: Some(
2337                        "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2338                    ),
2339                    permission: "READ".to_string(),
2340                },
2341            ],
2342        };
2343        let payload = toml::to_string(&snap).unwrap();
2344        store
2345            .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2346            .unwrap();
2347        let loaded = store.load().unwrap();
2348        let text = loaded
2349            .buckets
2350            .get("b")
2351            .unwrap()
2352            .subresources
2353            .get("acl.toml")
2354            .cloned()
2355            .unwrap();
2356        let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2357        assert_eq!(decoded.owner_id, "owner-abc");
2358        assert_eq!(decoded.grants.len(), 2);
2359        assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2360        assert_eq!(decoded.grants[1].grantee_type, "Group");
2361    }
2362
2363    #[test]
2364    fn inventory_snapshot_roundtrip_via_store() {
2365        let tmp = TempDir::new().unwrap();
2366        let store = new_store(&tmp);
2367        store
2368            .put_bucket_meta(
2369                "b",
2370                &BucketMeta {
2371                    name: "b".to_string(),
2372                    ..Default::default()
2373                },
2374            )
2375            .unwrap();
2376        let mut configs = HashMap::new();
2377        configs.insert(
2378            "inv-1".to_string(),
2379            "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2380        );
2381        configs.insert(
2382            "inv-2".to_string(),
2383            "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2384        );
2385        let snap = InventorySnapshot {
2386            configs: configs.clone(),
2387        };
2388        let payload = toml::to_string(&snap).unwrap();
2389        store
2390            .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2391            .unwrap();
2392        let loaded = store.load().unwrap();
2393        let text = loaded
2394            .buckets
2395            .get("b")
2396            .unwrap()
2397            .subresources
2398            .get("inventory.toml")
2399            .cloned()
2400            .unwrap();
2401        let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2402        assert_eq!(decoded.configs, configs);
2403    }
2404
2405    #[test]
2406    fn legacy_versioning_file_is_read() {
2407        let tmp = TempDir::new().unwrap();
2408        let store = new_store(&tmp);
2409        // Bucket meta with no versioning field set.
2410        store
2411            .put_bucket_meta(
2412                "b",
2413                &BucketMeta {
2414                    name: "b".to_string(),
2415                    ..Default::default()
2416                },
2417            )
2418            .unwrap();
2419        // Legacy sidecar: a bare versioning.toml with "Enabled".
2420        let path = store.bucket_dir("b").join("versioning.toml");
2421        std::fs::write(&path, "Enabled").unwrap();
2422        let loaded = store.load().unwrap();
2423        let snap = loaded.buckets.get("b").unwrap();
2424        assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2425    }
2426}