1use std::collections::{BTreeMap, HashMap};
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11 #[serde(default)]
12 pub account_id: String,
13 #[serde(default)]
14 pub region: String,
15 #[serde(default)]
16 pub buckets: HashMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21 pub meta: BucketMeta,
22 #[serde(default)]
23 pub objects: HashMap<String, LoadedObject>,
24 #[serde(default)]
25 pub object_versions: HashMap<String, Vec<LoadedObject>>,
26 #[serde(default)]
27 pub subresources: HashMap<String, String>,
28 #[serde(default)]
29 pub multipart_uploads: HashMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34 pub init: MpuInit,
35 #[serde(default)]
36 pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41 pub meta: UploadPartMeta,
42 #[serde(default)]
43 pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48 pub meta: ObjectMeta,
49 #[serde(default)]
50 pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55 #[serde(default)]
56 pub name: String,
57 #[serde(default = "default_time")]
58 pub creation_date: DateTime<Utc>,
59 #[serde(default)]
60 pub region: String,
61 #[serde(default)]
62 pub versioning: Option<String>,
63 #[serde(default)]
64 pub acl: Option<String>,
65 #[serde(default)]
66 pub acl_owner_id: String,
67 #[serde(default)]
68 pub accelerate_status: Option<String>,
69 #[serde(default)]
70 pub eventbridge_enabled: bool,
71}
72
73fn default_time() -> DateTime<Utc> {
74 DateTime::<Utc>::MIN_UTC
75}
76
77#[derive(Debug, Default, Clone, Serialize, Deserialize)]
78pub struct AclGrantSnapshot {
79 pub grantee_type: String,
80 #[serde(default)]
81 pub grantee_id: Option<String>,
82 #[serde(default)]
83 pub grantee_display_name: Option<String>,
84 #[serde(default)]
85 pub grantee_uri: Option<String>,
86 pub permission: String,
87}
88
89#[derive(Debug, Default, Clone, Serialize, Deserialize)]
90pub struct TagsSnapshot {
91 #[serde(default)]
92 pub tags: HashMap<String, String>,
93}
94
95#[derive(Debug, Default, Clone, Serialize, Deserialize)]
96pub struct AclSnapshot {
97 #[serde(default)]
98 pub owner_id: String,
99 #[serde(default)]
100 pub grants: Vec<AclGrantSnapshot>,
101}
102
103#[derive(Debug, Default, Clone, Serialize, Deserialize)]
104pub struct InventorySnapshot {
105 #[serde(default)]
106 pub configs: HashMap<String, String>,
107}
108
109#[derive(Debug, Default, Clone, Serialize, Deserialize)]
110pub struct ObjectMeta {
111 #[serde(default)]
112 pub key: String,
113 #[serde(default)]
114 pub content_type: String,
115 #[serde(default)]
116 pub etag: String,
117 #[serde(default)]
118 pub size: u64,
119 #[serde(default = "default_time")]
120 pub last_modified: DateTime<Utc>,
121 #[serde(default)]
122 pub metadata: HashMap<String, String>,
123 #[serde(default)]
124 pub tags: HashMap<String, String>,
125 #[serde(default)]
126 pub storage_class: String,
127 #[serde(default)]
128 pub acl_grants: Vec<AclGrantSnapshot>,
129 #[serde(default)]
130 pub acl_owner_id: Option<String>,
131 #[serde(default)]
132 pub parts_count: Option<u32>,
133 #[serde(default)]
134 pub part_sizes: Option<Vec<(u32, u64)>>,
135 #[serde(default)]
136 pub sse_algorithm: Option<String>,
137 #[serde(default)]
138 pub sse_kms_key_id: Option<String>,
139 #[serde(default)]
140 pub bucket_key_enabled: Option<bool>,
141 #[serde(default)]
142 pub version_id: Option<String>,
143 #[serde(default)]
144 pub is_delete_marker: bool,
145 #[serde(default)]
146 pub restore_ongoing: Option<bool>,
147 #[serde(default)]
148 pub restore_expiry: Option<String>,
149 #[serde(default)]
150 pub checksum_algorithm: Option<String>,
151 #[serde(default)]
152 pub checksum_value: Option<String>,
153 #[serde(default)]
154 pub lock_mode: Option<String>,
155 #[serde(default)]
156 pub lock_retain_until: Option<DateTime<Utc>>,
157 #[serde(default)]
158 pub lock_legal_hold: Option<String>,
159 #[serde(default)]
160 pub content_encoding: Option<String>,
161 #[serde(default)]
162 pub website_redirect_location: Option<String>,
163}
164
165#[derive(Debug, Default, Clone, Serialize, Deserialize)]
166pub struct MpuInit {
167 pub upload_id: String,
168 pub key: String,
169 #[serde(default = "default_time")]
170 pub initiated: DateTime<Utc>,
171 #[serde(default)]
172 pub metadata: HashMap<String, String>,
173 #[serde(default)]
174 pub content_type: String,
175 #[serde(default)]
176 pub storage_class: String,
177 #[serde(default)]
178 pub sse_algorithm: Option<String>,
179 #[serde(default)]
180 pub sse_kms_key_id: Option<String>,
181 #[serde(default)]
182 pub tagging: Option<String>,
183 #[serde(default)]
184 pub acl_grants: Vec<AclGrantSnapshot>,
185 #[serde(default)]
186 pub checksum_algorithm: Option<String>,
187}
188
189#[derive(Debug, Default, Clone, Serialize, Deserialize)]
190pub struct UploadPartMeta {
191 pub part_number: u32,
192 pub etag: String,
193 pub size: u64,
194 #[serde(default = "default_time")]
195 pub last_modified: DateTime<Utc>,
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub enum BucketSubresource {
200 Tags,
201 Lifecycle,
202 Cors,
203 Policy,
204 Notification,
205 Logging,
206 Website,
207 PublicAccessBlock,
208 ObjectLock,
209 Replication,
210 Ownership,
211 Inventory,
212 Encryption,
213 Versioning,
214 Acl,
215 Accelerate,
216 Analytics,
217 IntelligentTiering,
218 Metrics,
219 RequestPayment,
220 Abac,
221 MetadataConfiguration,
222 MetadataTableConfiguration,
223}
224
225pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
226 BucketSubresource::Tags,
227 BucketSubresource::Lifecycle,
228 BucketSubresource::Cors,
229 BucketSubresource::Policy,
230 BucketSubresource::Notification,
231 BucketSubresource::Logging,
232 BucketSubresource::Website,
233 BucketSubresource::PublicAccessBlock,
234 BucketSubresource::ObjectLock,
235 BucketSubresource::Replication,
236 BucketSubresource::Ownership,
237 BucketSubresource::Inventory,
238 BucketSubresource::Encryption,
239 BucketSubresource::Versioning,
240 BucketSubresource::Acl,
241 BucketSubresource::Accelerate,
242 BucketSubresource::Analytics,
243 BucketSubresource::IntelligentTiering,
244 BucketSubresource::Metrics,
245 BucketSubresource::RequestPayment,
246 BucketSubresource::Abac,
247 BucketSubresource::MetadataConfiguration,
248 BucketSubresource::MetadataTableConfiguration,
249];
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub enum BodyRef {
253 #[serde(skip)]
254 Memory(Bytes),
255 Disk {
256 bucket: String,
257 key: String,
258 #[serde(default)]
259 version: Option<String>,
260 path: PathBuf,
261 size: u64,
262 },
263}
264
265impl BodyRef {
266 pub fn size(&self) -> u64 {
267 match self {
268 BodyRef::Memory(b) => b.len() as u64,
269 BodyRef::Disk { size, .. } => *size,
270 }
271 }
272}
273
274impl Default for BodyRef {
275 fn default() -> Self {
276 BodyRef::Memory(Bytes::new())
277 }
278}
279
280#[derive(Debug)]
281pub enum BodySource {
282 Bytes(Bytes),
283 File(PathBuf),
286 FileCopy(PathBuf),
290}
291
292fn read_body_source(body: BodySource) -> StoreResult<Bytes> {
298 match body {
299 BodySource::Bytes(b) => Ok(b),
300 BodySource::File(p) => {
301 let bytes = std::fs::read(&p)?;
302 let _ = std::fs::remove_file(&p);
305 Ok(Bytes::from(bytes))
306 }
307 BodySource::FileCopy(p) => {
308 let bytes = std::fs::read(&p)?;
309 Ok(Bytes::from(bytes))
310 }
311 }
312}
313
314#[derive(Debug, Error)]
315pub enum StoreError {
316 #[error("io error: {0}")]
317 Io(#[from] std::io::Error),
318 #[error("serialization error: {0}")]
319 Serde(String),
320 #[error("not supported by this store")]
321 NotSupported,
322 #[error("{0}")]
323 Other(String),
324}
325
326pub type StoreResult<T> = Result<T, StoreError>;
327
328pub trait S3Store: Send + Sync {
329 fn load(&self) -> StoreResult<S3State>;
330
331 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
332 fn put_bucket_subresource(
333 &self,
334 bucket: &str,
335 kind: BucketSubresource,
336 payload: &str,
337 ) -> StoreResult<()>;
338 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
339 fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
340
341 fn put_object(
342 &self,
343 bucket: &str,
344 key: &str,
345 version: Option<&str>,
346 body: BodySource,
347 meta: &ObjectMeta,
348 ) -> StoreResult<BodyRef>;
349 fn put_object_meta(
350 &self,
351 bucket: &str,
352 key: &str,
353 version: Option<&str>,
354 meta: &ObjectMeta,
355 ) -> StoreResult<()>;
356 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
357 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
358
359 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
360 fn mpu_put_part(
366 &self,
367 bucket: &str,
368 upload_id: &str,
369 part_number: u32,
370 body: BodySource,
371 etag: &str,
372 ) -> StoreResult<BodyRef>;
373 fn spool_dir(&self) -> Option<std::path::PathBuf> {
380 None
381 }
382 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
383 fn mpu_complete(
384 &self,
385 bucket: &str,
386 upload_id: &str,
387 final_key: &str,
388 version: Option<&str>,
389 meta: &ObjectMeta,
390 ) -> StoreResult<BodyRef>;
391}
392
393pub struct MemoryS3Store;
394
395impl MemoryS3Store {
396 pub fn new() -> Self {
397 Self
398 }
399}
400
401impl Default for MemoryS3Store {
402 fn default() -> Self {
403 Self::new()
404 }
405}
406
407impl S3Store for MemoryS3Store {
408 fn load(&self) -> StoreResult<S3State> {
409 Ok(S3State::default())
410 }
411
412 fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
413 Ok(())
414 }
415 fn put_bucket_subresource(
416 &self,
417 _bucket: &str,
418 _kind: BucketSubresource,
419 _payload: &str,
420 ) -> StoreResult<()> {
421 Ok(())
422 }
423 fn delete_bucket_subresource(
424 &self,
425 _bucket: &str,
426 _kind: BucketSubresource,
427 ) -> StoreResult<()> {
428 Ok(())
429 }
430 fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
431 Ok(())
432 }
433
434 fn put_object(
435 &self,
436 _bucket: &str,
437 _key: &str,
438 _version: Option<&str>,
439 body: BodySource,
440 _meta: &ObjectMeta,
441 ) -> StoreResult<BodyRef> {
442 Ok(BodyRef::Memory(read_body_source(body)?))
443 }
444 fn put_object_meta(
445 &self,
446 _bucket: &str,
447 _key: &str,
448 _version: Option<&str>,
449 _meta: &ObjectMeta,
450 ) -> StoreResult<()> {
451 Ok(())
452 }
453 fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
454 Ok(())
455 }
456 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
457 match body {
458 BodyRef::Memory(b) => Ok(b.clone()),
459 BodyRef::Disk { .. } => {
460 panic!("MemoryS3Store cannot open Disk-backed BodyRef")
461 }
462 }
463 }
464
465 fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
466 Ok(())
467 }
468 fn mpu_put_part(
469 &self,
470 _bucket: &str,
471 _upload_id: &str,
472 _part_number: u32,
473 body: BodySource,
474 _etag: &str,
475 ) -> StoreResult<BodyRef> {
476 Ok(BodyRef::Memory(read_body_source(body)?))
479 }
480 fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
481 Ok(())
482 }
483 fn mpu_complete(
484 &self,
485 _bucket: &str,
486 _upload_id: &str,
487 _final_key: &str,
488 _version: Option<&str>,
489 _meta: &ObjectMeta,
490 ) -> StoreResult<BodyRef> {
491 Ok(BodyRef::Memory(Bytes::new()))
492 }
493}
494
495pub struct DiskS3Store {
496 root: PathBuf,
497 cache: std::sync::Arc<crate::cache::BodyCache>,
498}
499
500impl DiskS3Store {
501 pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
502 Self { root, cache }
503 }
504
505 fn buckets_dir(&self) -> PathBuf {
506 self.root.join("buckets")
507 }
508
509 fn bucket_dir(&self, bucket: &str) -> PathBuf {
510 self.buckets_dir()
511 .join(crate::key_escape::escape_key_segment(bucket))
512 }
513
514 fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
515 self.bucket_dir(bucket)
516 .join("objects")
517 .join(crate::key_escape::escape_key_segment(key))
518 }
519
520 fn version_tag(version: Option<&str>) -> String {
521 version.unwrap_or("null").to_string()
522 }
523
524 fn object_paths(
525 &self,
526 bucket: &str,
527 key: &str,
528 version: Option<&str>,
529 ) -> (PathBuf, PathBuf, PathBuf) {
530 let dir = self.object_dir(bucket, key);
531 let tag = Self::version_tag(version);
532 let bin = dir.join(format!("{}.bin", tag));
533 let toml = dir.join(format!("{}.toml", tag));
534 (dir, bin, toml)
535 }
536
537 fn subresource_filename(kind: BucketSubresource) -> &'static str {
538 match kind {
539 BucketSubresource::Tags => "tags.toml",
540 BucketSubresource::Lifecycle => "lifecycle.toml",
541 BucketSubresource::Cors => "cors.toml",
542 BucketSubresource::Policy => "policy.toml",
543 BucketSubresource::Notification => "notification.toml",
544 BucketSubresource::Logging => "logging.toml",
545 BucketSubresource::Website => "website.toml",
546 BucketSubresource::PublicAccessBlock => "public_access_block.toml",
547 BucketSubresource::ObjectLock => "object_lock.toml",
548 BucketSubresource::Replication => "replication.toml",
549 BucketSubresource::Ownership => "ownership.toml",
550 BucketSubresource::Inventory => "inventory.toml",
551 BucketSubresource::Encryption => "encryption.toml",
552 BucketSubresource::Versioning => "versioning.toml",
553 BucketSubresource::Acl => "acl.toml",
554 BucketSubresource::Accelerate => "accelerate.toml",
555 BucketSubresource::Analytics => "analytics.toml",
556 BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
557 BucketSubresource::Metrics => "metrics.toml",
558 BucketSubresource::RequestPayment => "request_payment.toml",
559 BucketSubresource::Abac => "abac.toml",
560 BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
561 BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
562 }
563 }
564
565 fn cleanup_empty(dir: &std::path::Path) {
566 let _ = std::fs::remove_dir(dir);
567 }
568
569 fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
570 self.bucket_dir(bucket)
571 .join("mpu")
572 .join(crate::key_escape::escape_key_segment(upload_id))
573 }
574
575 fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
576 self.mpu_dir(bucket, upload_id).join("parts")
577 }
578
579 fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
580 self.mpu_parts_dir(bucket, upload_id)
581 .join(format!("{}.bin", part_number))
582 }
583
584 fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
585 self.mpu_parts_dir(bucket, upload_id)
586 .join(format!("{}.toml", part_number))
587 }
588
589 fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
590 crate::cache::BodyKey::new(
591 bucket.to_string(),
592 format!("__mpu__/{}", upload_id),
593 Some(format!("part-{}", part_number)),
594 )
595 }
596}
597
598fn io_other(msg: impl Into<String>) -> StoreError {
599 StoreError::Other(msg.into())
600}
601
602impl S3Store for DiskS3Store {
603 fn load(&self) -> StoreResult<S3State> {
604 let mut state = S3State::default();
605 let buckets_dir = self.buckets_dir();
606 if !buckets_dir.exists() {
607 return Ok(state);
608 }
609 for entry in std::fs::read_dir(&buckets_dir)? {
610 let entry = entry?;
611 if !entry.file_type()?.is_dir() {
612 continue;
613 }
614 let bdir = entry.path();
615 let meta_path = bdir.join("meta.toml");
616 if !meta_path.exists() {
617 continue;
618 }
619 let meta_text = std::fs::read_to_string(&meta_path)?;
620 let mut meta: BucketMeta =
621 toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
622 let mut snap = BucketSnapshot {
623 meta: meta.clone(),
624 objects: HashMap::new(),
625 object_versions: HashMap::new(),
626 subresources: HashMap::new(),
627 multipart_uploads: HashMap::new(),
628 };
629
630 for kind in ALL_SUBRESOURCES {
631 let fname = Self::subresource_filename(*kind);
632 let path = bdir.join(fname);
633 if path.exists() {
634 let text = std::fs::read_to_string(&path)?;
635 if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none() {
636 let stripped = text.trim();
637 if !stripped.is_empty() {
638 snap.meta.versioning = Some(stripped.to_string());
639 meta.versioning = snap.meta.versioning.clone();
640 }
641 }
642 snap.subresources.insert(fname.to_string(), text);
643 }
644 }
645
646 let objects_root = bdir.join("objects");
647 if objects_root.exists() {
648 for okey_entry in std::fs::read_dir(&objects_root)? {
649 let okey_entry = okey_entry?;
650 if !okey_entry.file_type()?.is_dir() {
651 continue;
652 }
653 let key_dir = okey_entry.path();
654 let mut versioned: Vec<LoadedObject> = Vec::new();
655 let mut key_name: Option<String> = None;
656 for version_entry in std::fs::read_dir(&key_dir)? {
657 let version_entry = version_entry?;
658 let path = version_entry.path();
659 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
660 continue;
661 };
662 if !fname.ends_with(".toml") {
663 continue;
664 }
665 let version_tag = &fname[..fname.len() - 5];
666 let toml_text = std::fs::read_to_string(&path)?;
667 let obj_meta: ObjectMeta = toml::from_str(&toml_text)
668 .map_err(|e| StoreError::Serde(e.to_string()))?;
669 let bin_path = key_dir.join(format!("{}.bin", version_tag));
670 let (body, size) = if obj_meta.is_delete_marker {
671 (BodyRef::Memory(Bytes::new()), 0u64)
672 } else if bin_path.exists() {
673 let sz = std::fs::metadata(&bin_path)?.len();
674 (
675 BodyRef::Disk {
676 bucket: meta.name.clone(),
677 key: obj_meta.key.clone(),
678 version: if version_tag == "null" {
679 None
680 } else {
681 Some(version_tag.to_string())
682 },
683 path: bin_path,
684 size: sz,
685 },
686 sz,
687 )
688 } else {
689 return Err(StoreError::Other(format!(
694 "missing body file: {}",
695 bin_path.display()
696 )));
697 };
698 let _ = size;
699 key_name.get_or_insert_with(|| obj_meta.key.clone());
700 if version_tag == "null" && obj_meta.version_id.is_none() {
701 snap.objects.insert(
702 obj_meta.key.clone(),
703 LoadedObject {
704 meta: obj_meta,
705 body,
706 },
707 );
708 } else {
709 versioned.push(LoadedObject {
710 meta: obj_meta,
711 body,
712 });
713 }
714 }
715 if !versioned.is_empty() {
716 versioned.sort_by_key(|v| v.meta.last_modified);
717 if let Some(key) = key_name {
718 match versioned.last() {
725 Some(newest) if newest.meta.is_delete_marker => {
726 snap.objects.remove(&key);
727 }
728 Some(newest) => {
729 snap.objects.insert(key.clone(), newest.clone());
730 }
731 None => {}
732 }
733 snap.object_versions.insert(key, versioned);
734 }
735 }
736 }
737 }
738
739 let mpu_root = bdir.join("mpu");
740 if mpu_root.exists() {
741 for upload_entry in std::fs::read_dir(&mpu_root)? {
742 let upload_entry = upload_entry?;
743 if !upload_entry.file_type()?.is_dir() {
744 continue;
745 }
746 let upload_dir = upload_entry.path();
747 let init_path = upload_dir.join("init.toml");
748 if !init_path.exists() {
749 continue;
750 }
751 let init_text = std::fs::read_to_string(&init_path)?;
752 let init: MpuInit =
753 toml::from_str(&init_text).map_err(|e| StoreError::Serde(e.to_string()))?;
754 let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
755 let parts_dir = upload_dir.join("parts");
756 if parts_dir.exists() {
757 for part_entry in std::fs::read_dir(&parts_dir)? {
758 let part_entry = part_entry?;
759 let path = part_entry.path();
760 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
761 continue;
762 };
763 if !fname.ends_with(".toml") {
764 continue;
765 }
766 let stem = &fname[..fname.len() - 5];
767 let Ok(part_number) = stem.parse::<u32>() else {
768 continue;
769 };
770 let toml_text = std::fs::read_to_string(&path)?;
771 let part_meta: UploadPartMeta = toml::from_str(&toml_text)
772 .map_err(|e| StoreError::Serde(e.to_string()))?;
773 let bin_path = parts_dir.join(format!("{}.bin", part_number));
774 if !bin_path.exists() {
775 return Err(StoreError::Other(format!(
776 "missing multipart part body file: {}",
777 bin_path.display()
778 )));
779 }
780 let sz = std::fs::metadata(&bin_path)?.len();
781 let body = BodyRef::Disk {
782 bucket: meta.name.clone(),
783 key: format!("__mpu__/{}", init.upload_id),
784 version: Some(format!("part-{}", part_number)),
785 path: bin_path,
786 size: sz,
787 };
788 loaded_parts.insert(
789 part_number,
790 LoadedPart {
791 meta: part_meta,
792 body,
793 },
794 );
795 }
796 }
797 snap.multipart_uploads.insert(
798 init.upload_id.clone(),
799 LoadedMpu {
800 init,
801 parts: loaded_parts,
802 },
803 );
804 }
805 }
806
807 state.buckets.insert(meta.name.clone(), snap);
808 }
809 Ok(state)
810 }
811
812 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
813 let dir = self.bucket_dir(bucket);
814 std::fs::create_dir_all(&dir)?;
815 crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
816 Ok(())
817 }
818
819 fn put_bucket_subresource(
820 &self,
821 bucket: &str,
822 kind: BucketSubresource,
823 payload: &str,
824 ) -> StoreResult<()> {
825 let dir = self.bucket_dir(bucket);
826 std::fs::create_dir_all(&dir)?;
827 let path = dir.join(Self::subresource_filename(kind));
828 crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
829 Ok(())
830 }
831
832 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
833 let path = self
834 .bucket_dir(bucket)
835 .join(Self::subresource_filename(kind));
836 match std::fs::remove_file(&path) {
837 Ok(_) => Ok(()),
838 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
839 Err(e) => Err(e.into()),
840 }
841 }
842
843 fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
844 let dir = self.bucket_dir(bucket);
845 match std::fs::remove_dir_all(&dir) {
846 Ok(_) => Ok(()),
847 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
848 Err(e) => Err(e.into()),
849 }
850 }
851
852 fn put_object(
853 &self,
854 bucket: &str,
855 key: &str,
856 version: Option<&str>,
857 body: BodySource,
858 meta: &ObjectMeta,
859 ) -> StoreResult<BodyRef> {
860 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
861 std::fs::create_dir_all(&dir)?;
862
863 if meta.is_delete_marker {
864 crate::atomic::write_atomic_toml(&toml_path, meta)?;
865 return Ok(BodyRef::Memory(Bytes::new()));
866 }
867
868 let size: u64;
869 let bytes_for_cache: Option<Bytes>;
870 match body {
871 BodySource::Bytes(b) => {
872 size = b.len() as u64;
873 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
874 bytes_for_cache = Some(b);
875 }
876 BodySource::File(src) => {
877 let src_size = std::fs::metadata(&src)?.len();
878 size = src_size;
879 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
880 bytes_for_cache = None;
881 }
882 BodySource::FileCopy(src) => {
883 let src_size = std::fs::metadata(&src)?.len();
884 size = src_size;
885 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
886 bytes_for_cache = None;
887 }
888 }
889
890 crate::atomic::write_atomic_toml(&toml_path, meta)?;
891
892 let body_key = crate::cache::BodyKey::new(
893 bucket.to_string(),
894 key.to_string(),
895 version.map(|s| s.to_string()),
896 );
897 if let Some(b) = bytes_for_cache {
898 self.cache.insert(body_key, b);
899 } else {
900 self.cache.invalidate(&crate::cache::BodyKey::new(
901 bucket.to_string(),
902 key.to_string(),
903 version.map(|s| s.to_string()),
904 ));
905 }
906
907 Ok(BodyRef::Disk {
908 bucket: bucket.to_string(),
909 key: key.to_string(),
910 version: version.map(|s| s.to_string()),
911 path: bin_path,
912 size,
913 })
914 }
915
916 fn put_object_meta(
917 &self,
918 bucket: &str,
919 key: &str,
920 version: Option<&str>,
921 meta: &ObjectMeta,
922 ) -> StoreResult<()> {
923 let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
924 std::fs::create_dir_all(&dir)?;
925 crate::atomic::write_atomic_toml(&toml_path, meta)?;
926 Ok(())
927 }
928
929 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
930 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
931 for p in [&bin_path, &toml_path] {
932 match std::fs::remove_file(p) {
933 Ok(_) => {}
934 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
935 Err(e) => return Err(e.into()),
936 }
937 }
938 Self::cleanup_empty(&dir);
939
940 self.cache.invalidate(&crate::cache::BodyKey::new(
941 bucket.to_string(),
942 key.to_string(),
943 version.map(|s| s.to_string()),
944 ));
945 Ok(())
946 }
947
948 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
949 match body {
950 BodyRef::Memory(b) => Ok(b.clone()),
951 BodyRef::Disk {
952 bucket,
953 key,
954 version,
955 path,
956 size: _,
957 } => {
958 let body_key =
959 crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
960 if let Some(bytes) = self.cache.get(&body_key) {
961 return Ok(bytes);
962 }
963 let bytes = Bytes::from(std::fs::read(path)?);
964 self.cache.insert(body_key, bytes.clone());
965 Ok(bytes)
966 }
967 }
968 }
969
970 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
971 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
972 std::fs::create_dir_all(&parts_dir)?;
973 let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
974 crate::atomic::write_atomic_toml(&init_path, init)?;
975 Ok(())
976 }
977
978 fn mpu_put_part(
979 &self,
980 bucket: &str,
981 upload_id: &str,
982 part_number: u32,
983 body: BodySource,
984 etag: &str,
985 ) -> StoreResult<BodyRef> {
986 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
987 std::fs::create_dir_all(&parts_dir)?;
988 let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
989 let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
990
991 let size: u64 = match body {
992 BodySource::Bytes(b) => {
993 let n = b.len() as u64;
994 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
995 let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
996 self.cache.insert(cache_key, b);
997 n
998 }
999 BodySource::File(src) => {
1000 let n = std::fs::metadata(&src)?.len();
1001 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
1002 self.cache
1003 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1004 n
1005 }
1006 BodySource::FileCopy(src) => {
1007 let n = std::fs::metadata(&src)?.len();
1008 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
1009 self.cache
1010 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1011 n
1012 }
1013 };
1014
1015 let meta = UploadPartMeta {
1016 part_number,
1017 etag: etag.to_string(),
1018 size,
1019 last_modified: Utc::now(),
1020 };
1021 crate::atomic::write_atomic_toml(&toml_path, &meta)?;
1022 Ok(BodyRef::Disk {
1025 bucket: bucket.to_string(),
1026 key: format!("__mpu__/{upload_id}/part-{part_number:05}"),
1027 version: None,
1028 path: bin_path,
1029 size,
1030 })
1031 }
1032
1033 fn spool_dir(&self) -> Option<std::path::PathBuf> {
1034 let dir = self.root.join(".spool");
1035 let _ = std::fs::create_dir_all(&dir);
1039 Some(dir)
1040 }
1041
1042 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
1043 let dir = self.mpu_dir(bucket, upload_id);
1044 if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
1046 for entry in entries.flatten() {
1047 let path = entry.path();
1048 if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
1049 if let Some(stem) = fname.strip_suffix(".bin") {
1050 if let Ok(n) = stem.parse::<u32>() {
1051 self.cache
1052 .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1053 }
1054 }
1055 }
1056 }
1057 }
1058 match std::fs::remove_dir_all(&dir) {
1059 Ok(_) => Ok(()),
1060 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1061 Err(e) => Err(e.into()),
1062 }
1063 }
1064
1065 fn mpu_complete(
1066 &self,
1067 bucket: &str,
1068 upload_id: &str,
1069 final_key: &str,
1070 version: Option<&str>,
1071 meta: &ObjectMeta,
1072 ) -> StoreResult<BodyRef> {
1073 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1074 if !parts_dir.exists() {
1075 return Err(io_other(format!(
1076 "mpu_complete: no parts dir for upload {}",
1077 upload_id
1078 )));
1079 }
1080
1081 let mut part_numbers: Vec<u32> = Vec::new();
1083 for entry in std::fs::read_dir(&parts_dir)? {
1084 let entry = entry?;
1085 let path = entry.path();
1086 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1087 continue;
1088 };
1089 if let Some(stem) = fname.strip_suffix(".toml") {
1090 if let Ok(n) = stem.parse::<u32>() {
1091 if self.mpu_part_bin(bucket, upload_id, n).exists() {
1092 part_numbers.push(n);
1093 }
1094 }
1095 }
1096 }
1097 part_numbers.sort_unstable();
1098 if part_numbers.is_empty() {
1099 return Err(io_other(format!(
1100 "mpu_complete: upload {} has no parts",
1101 upload_id
1102 )));
1103 }
1104
1105 let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1106 std::fs::create_dir_all(&dir)?;
1107
1108 let total_size: u64 = if part_numbers.len() == 1 {
1109 let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1110 let sz = std::fs::metadata(&only)?.len();
1111 match std::fs::rename(&only, &bin_path) {
1112 Ok(_) => {}
1113 Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1114 {
1116 let mut input = std::fs::File::open(&only)?;
1117 let tmp = {
1118 let mut os = bin_path.as_os_str().to_owned();
1119 os.push(".tmp");
1120 PathBuf::from(os)
1121 };
1122 let mut out = std::fs::OpenOptions::new()
1123 .write(true)
1124 .create(true)
1125 .truncate(true)
1126 .open(&tmp)?;
1127 std::io::copy(&mut input, &mut out)?;
1128 out.sync_all()?;
1129 std::fs::rename(&tmp, &bin_path)?;
1130 }
1131 let _ = std::fs::remove_file(&only);
1132 }
1133 Err(e) => return Err(e.into()),
1134 }
1135 sz
1136 } else {
1137 let tmp = {
1138 let mut os = bin_path.as_os_str().to_owned();
1139 os.push(".tmp");
1140 PathBuf::from(os)
1141 };
1142 let mut total: u64 = 0;
1143 {
1144 let mut out = std::fs::OpenOptions::new()
1145 .write(true)
1146 .create(true)
1147 .truncate(true)
1148 .open(&tmp)?;
1149 for n in &part_numbers {
1150 let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1151 let mut input = std::fs::File::open(&part_path)?;
1152 let copied = std::io::copy(&mut input, &mut out)?;
1153 total += copied;
1154 }
1155 out.sync_all()?;
1156 }
1157 std::fs::rename(&tmp, &bin_path)?;
1158 if let Some(parent) = bin_path.parent() {
1159 if let Ok(dir_handle) = std::fs::File::open(parent) {
1160 let _ = dir_handle.sync_all();
1161 }
1162 }
1163 total
1164 };
1165
1166 crate::atomic::write_atomic_toml(&toml_path, meta)?;
1167
1168 for n in &part_numbers {
1170 self.cache
1171 .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1172 }
1173 let mpu_dir = self.mpu_dir(bucket, upload_id);
1174 let _ = std::fs::remove_dir_all(&mpu_dir);
1175
1176 self.cache.invalidate(&crate::cache::BodyKey::new(
1181 bucket.to_string(),
1182 final_key.to_string(),
1183 version.map(|s| s.to_string()),
1184 ));
1185
1186 Ok(BodyRef::Disk {
1187 bucket: bucket.to_string(),
1188 key: final_key.to_string(),
1189 version: version.map(|s| s.to_string()),
1190 path: bin_path,
1191 size: total_size,
1192 })
1193 }
1194}
1195
1196fn libc_exdev() -> i32 {
1197 18
1198}
1199
1200#[cfg(test)]
1201mod disk_tests {
1202 use super::*;
1203 use std::sync::Arc;
1204 use tempfile::TempDir;
1205
1206 fn new_store(tmp: &TempDir) -> DiskS3Store {
1207 let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1208 DiskS3Store::new(tmp.path().to_path_buf(), cache)
1209 }
1210
1211 fn new_store_with_cache(
1212 tmp: &TempDir,
1213 cap: u64,
1214 ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1215 let cache = Arc::new(crate::cache::BodyCache::new(cap));
1216 (
1217 DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1218 cache,
1219 )
1220 }
1221
1222 fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1223 ObjectMeta {
1224 key: key.to_string(),
1225 content_type: "application/octet-stream".to_string(),
1226 etag: "etag".to_string(),
1227 size,
1228 ..Default::default()
1229 }
1230 }
1231
1232 #[test]
1233 fn put_bucket_meta_roundtrip() {
1234 let tmp = TempDir::new().unwrap();
1235 let store = new_store(&tmp);
1236 let meta = BucketMeta {
1237 name: "b1".to_string(),
1238 region: "us-east-1".to_string(),
1239 versioning: Some("Enabled".to_string()),
1240 ..Default::default()
1241 };
1242 store.put_bucket_meta("b1", &meta).unwrap();
1243 let loaded = store.load().unwrap();
1244 let snap = loaded.buckets.get("b1").unwrap();
1245 assert_eq!(snap.meta.name, "b1");
1246 assert_eq!(snap.meta.region, "us-east-1");
1247 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1248 }
1249
1250 #[test]
1251 fn put_bucket_subresource_each_variant_writes_file() {
1252 let tmp = TempDir::new().unwrap();
1253 let store = new_store(&tmp);
1254 store
1255 .put_bucket_meta(
1256 "b",
1257 &BucketMeta {
1258 name: "b".to_string(),
1259 ..Default::default()
1260 },
1261 )
1262 .unwrap();
1263 let variants = [
1264 BucketSubresource::Tags,
1265 BucketSubresource::Lifecycle,
1266 BucketSubresource::Cors,
1267 BucketSubresource::Policy,
1268 BucketSubresource::Notification,
1269 BucketSubresource::Logging,
1270 BucketSubresource::Website,
1271 BucketSubresource::PublicAccessBlock,
1272 BucketSubresource::ObjectLock,
1273 BucketSubresource::Replication,
1274 BucketSubresource::Ownership,
1275 BucketSubresource::Inventory,
1276 BucketSubresource::Encryption,
1277 BucketSubresource::Versioning,
1278 BucketSubresource::Acl,
1279 BucketSubresource::Accelerate,
1280 ];
1281 for v in variants {
1282 store
1283 .put_bucket_subresource("b", v, "payload=true")
1284 .unwrap();
1285 let file = store
1286 .bucket_dir("b")
1287 .join(DiskS3Store::subresource_filename(v));
1288 assert!(file.exists(), "{:?}", v);
1289 assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1290 }
1291 }
1292
1293 #[test]
1294 fn delete_bucket_subresource_removes_file() {
1295 let tmp = TempDir::new().unwrap();
1296 let store = new_store(&tmp);
1297 store
1298 .put_bucket_meta(
1299 "b",
1300 &BucketMeta {
1301 name: "b".to_string(),
1302 ..Default::default()
1303 },
1304 )
1305 .unwrap();
1306 store
1307 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1308 .unwrap();
1309 store
1310 .delete_bucket_subresource("b", BucketSubresource::Tags)
1311 .unwrap();
1312 let file = store.bucket_dir("b").join("tags.toml");
1313 assert!(!file.exists());
1314 store
1316 .delete_bucket_subresource("b", BucketSubresource::Tags)
1317 .unwrap();
1318 }
1319
1320 #[test]
1321 fn put_object_bytes_roundtrip() {
1322 let tmp = TempDir::new().unwrap();
1323 let store = new_store(&tmp);
1324 store
1325 .put_bucket_meta(
1326 "b",
1327 &BucketMeta {
1328 name: "b".to_string(),
1329 ..Default::default()
1330 },
1331 )
1332 .unwrap();
1333 let data = Bytes::from_static(b"hello world");
1334 let meta = sample_meta("k1", data.len() as u64);
1335 let body_ref = store
1336 .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1337 .unwrap();
1338 match &body_ref {
1339 BodyRef::Disk {
1340 bucket,
1341 key,
1342 size,
1343 path,
1344 ..
1345 } => {
1346 assert_eq!(bucket, "b");
1347 assert_eq!(key, "k1");
1348 assert_eq!(*size, data.len() as u64);
1349 assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1350 }
1351 _ => panic!("expected Disk"),
1352 }
1353 let loaded = store.load().unwrap();
1354 let snap = loaded.buckets.get("b").unwrap();
1355 let obj = snap.objects.get("k1").unwrap();
1356 assert_eq!(obj.meta.size, data.len() as u64);
1357 }
1358
1359 #[test]
1360 fn put_object_file_copy_source_leaves_src_in_place() {
1361 let tmp = TempDir::new().unwrap();
1362 let store = new_store(&tmp);
1363 store
1364 .put_bucket_meta(
1365 "b",
1366 &BucketMeta {
1367 name: "b".to_string(),
1368 ..Default::default()
1369 },
1370 )
1371 .unwrap();
1372 let src_dir = TempDir::new().unwrap();
1373 let src = src_dir.path().join("src.bin");
1374 std::fs::write(&src, b"copied-body").unwrap();
1375 let meta = sample_meta("k", 11);
1376 let bref = store
1377 .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1378 .unwrap();
1379 match bref {
1380 BodyRef::Disk { path, size, .. } => {
1381 assert_eq!(size, 11);
1382 assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1383 }
1384 _ => panic!("expected Disk bodyref"),
1385 }
1386 assert!(src.exists(), "source file must not be moved by FileCopy");
1387 assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1388 }
1389
1390 #[test]
1391 fn put_object_file_source() {
1392 let tmp = TempDir::new().unwrap();
1393 let store = new_store(&tmp);
1394 store
1395 .put_bucket_meta(
1396 "b",
1397 &BucketMeta {
1398 name: "b".to_string(),
1399 ..Default::default()
1400 },
1401 )
1402 .unwrap();
1403 let src = tmp.path().join("src.bin");
1404 std::fs::write(&src, b"file-body").unwrap();
1405 let meta = sample_meta("k", 9);
1406 let body_ref = store
1407 .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1408 .unwrap();
1409 let path = match body_ref {
1410 BodyRef::Disk { path, .. } => path,
1411 _ => panic!(),
1412 };
1413 assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1414 }
1415
1416 #[test]
1417 fn put_object_meta_only_keeps_bin() {
1418 let tmp = TempDir::new().unwrap();
1419 let store = new_store(&tmp);
1420 store
1421 .put_bucket_meta(
1422 "b",
1423 &BucketMeta {
1424 name: "b".to_string(),
1425 ..Default::default()
1426 },
1427 )
1428 .unwrap();
1429 let data = Bytes::from_static(b"abc");
1430 let mut meta = sample_meta("k", 3);
1431 store
1432 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1433 .unwrap();
1434 let (_, bin, _) = store.object_paths("b", "k", None);
1435 let before = std::fs::read(&bin).unwrap();
1436 meta.tags.insert("x".to_string(), "y".to_string());
1437 store.put_object_meta("b", "k", None, &meta).unwrap();
1438 assert_eq!(std::fs::read(&bin).unwrap(), before);
1439 let loaded = store.load().unwrap();
1440 let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1441 assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1442 }
1443
1444 #[test]
1445 fn delete_object_cleans_up_files_and_cache() {
1446 let tmp = TempDir::new().unwrap();
1447 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1448 store
1449 .put_bucket_meta(
1450 "b",
1451 &BucketMeta {
1452 name: "b".to_string(),
1453 ..Default::default()
1454 },
1455 )
1456 .unwrap();
1457 let data = Bytes::from_static(b"bye");
1458 let meta = sample_meta("k", 3);
1459 store
1460 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1461 .unwrap();
1462 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1463 assert!(cache.get(&body_key).is_some());
1464 store.delete_object("b", "k", None).unwrap();
1465 let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1466 assert!(!bin.exists());
1467 assert!(!toml_path.exists());
1468 assert!(!dir.exists());
1469 assert!(cache.get(&body_key).is_none());
1470 }
1471
1472 #[test]
1473 fn open_object_body_cache_hit_and_refill() {
1474 let tmp = TempDir::new().unwrap();
1475 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1476 store
1477 .put_bucket_meta(
1478 "b",
1479 &BucketMeta {
1480 name: "b".to_string(),
1481 ..Default::default()
1482 },
1483 )
1484 .unwrap();
1485 let data = Bytes::from_static(b"payload");
1486 let meta = sample_meta("k", data.len() as u64);
1487 let body_ref = store
1488 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1489 .unwrap();
1490 let got = store.open_object_body(&body_ref).unwrap();
1492 assert_eq!(got, data);
1493 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1495 cache.invalidate(&body_key);
1496 assert!(cache.get(&body_key).is_none());
1497 let got = store.open_object_body(&body_ref).unwrap();
1498 assert_eq!(got, data);
1499 assert!(cache.get(&body_key).is_some());
1500 }
1501
1502 #[test]
1503 fn open_object_body_large_bypasses_cache() {
1504 let tmp = TempDir::new().unwrap();
1505 let (store, cache) = new_store_with_cache(&tmp, 1024);
1507 store
1508 .put_bucket_meta(
1509 "b",
1510 &BucketMeta {
1511 name: "b".to_string(),
1512 ..Default::default()
1513 },
1514 )
1515 .unwrap();
1516 let data = Bytes::from(vec![7u8; 800]);
1517 let meta = sample_meta("big", 800);
1518 let body_ref = store
1519 .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1520 .unwrap();
1521 let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1522 assert!(cache.get(&body_key).is_none());
1523 let got = store.open_object_body(&body_ref).unwrap();
1524 assert_eq!(got, data);
1525 assert!(cache.get(&body_key).is_none());
1527 }
1528
1529 #[test]
1530 fn load_empty_dir() {
1531 let tmp = TempDir::new().unwrap();
1532 let store = new_store(&tmp);
1533 let s = store.load().unwrap();
1534 assert!(s.buckets.is_empty());
1535 }
1536
1537 #[test]
1538 fn load_skips_mpu_without_init() {
1539 let tmp = TempDir::new().unwrap();
1540 let store = new_store(&tmp);
1541 store
1542 .put_bucket_meta(
1543 "b",
1544 &BucketMeta {
1545 name: "b".to_string(),
1546 ..Default::default()
1547 },
1548 )
1549 .unwrap();
1550 let data = Bytes::from_static(b"x");
1551 let meta = sample_meta("k", 1);
1552 store
1553 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1554 .unwrap();
1555 let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1557 std::fs::create_dir_all(&mpu).unwrap();
1558
1559 let loaded = store.load().unwrap();
1560 let snap = loaded.buckets.get("b").unwrap();
1561 assert_eq!(snap.objects.len(), 1);
1562 assert!(snap.multipart_uploads.is_empty());
1563 }
1564
1565 #[test]
1566 fn load_reads_bucket_subresources() {
1567 let tmp = TempDir::new().unwrap();
1568 let store = new_store(&tmp);
1569 store
1570 .put_bucket_meta(
1571 "b",
1572 &BucketMeta {
1573 name: "b".to_string(),
1574 ..Default::default()
1575 },
1576 )
1577 .unwrap();
1578 store
1579 .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1580 .unwrap();
1581 store
1582 .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1583 .unwrap();
1584 store
1585 .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1586 .unwrap();
1587 store
1588 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1589 .unwrap();
1590 let loaded = store.load().unwrap();
1591 let snap = loaded.buckets.get("b").unwrap();
1592 assert_eq!(
1593 snap.subresources.get("lifecycle.toml").map(String::as_str),
1594 Some("<Lifecycle/>"),
1595 );
1596 assert_eq!(
1597 snap.subresources.get("cors.toml").map(String::as_str),
1598 Some("<Cors/>"),
1599 );
1600 assert_eq!(
1601 snap.subresources.get("policy.toml").map(String::as_str),
1602 Some("{}"),
1603 );
1604 assert_eq!(
1605 snap.subresources.get("tags.toml").map(String::as_str),
1606 Some("x=1"),
1607 );
1608 }
1609
1610 #[test]
1611 fn versioned_put_load_roundtrip() {
1612 let tmp = TempDir::new().unwrap();
1613 let store = new_store(&tmp);
1614 store
1615 .put_bucket_meta(
1616 "b",
1617 &BucketMeta {
1618 name: "b".to_string(),
1619 versioning: Some("Enabled".to_string()),
1620 ..Default::default()
1621 },
1622 )
1623 .unwrap();
1624 let base = chrono::Utc::now();
1625 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1626 .iter()
1627 .enumerate()
1628 {
1629 let mut m = sample_meta("k", body.len() as u64);
1630 m.version_id = Some((*vid).to_string());
1631 m.last_modified = base + chrono::Duration::seconds(i as i64);
1632 store
1633 .put_object(
1634 "b",
1635 "k",
1636 Some(*vid),
1637 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1638 &m,
1639 )
1640 .unwrap();
1641 }
1642 let loaded = store.load().unwrap();
1643 let snap = loaded.buckets.get("b").unwrap();
1644 let versions = snap.object_versions.get("k").expect("versions present");
1645 assert_eq!(versions.len(), 3);
1646 assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1647 assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1648 assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1649 for v in versions {
1650 match &v.body {
1651 BodyRef::Disk { size, .. } => assert!(*size > 0),
1652 _ => panic!("expected Disk body"),
1653 }
1654 }
1655 }
1656
1657 #[test]
1658 fn versioned_load_promotes_latest_live_to_snap_objects() {
1659 let tmp = TempDir::new().unwrap();
1660 let store = new_store(&tmp);
1661 store
1662 .put_bucket_meta(
1663 "b",
1664 &BucketMeta {
1665 name: "b".to_string(),
1666 versioning: Some("Enabled".to_string()),
1667 ..Default::default()
1668 },
1669 )
1670 .unwrap();
1671 let base = chrono::Utc::now();
1672 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1673 .iter()
1674 .enumerate()
1675 {
1676 let mut m = sample_meta("k", body.len() as u64);
1677 m.version_id = Some((*vid).to_string());
1678 m.last_modified = base + chrono::Duration::seconds(i as i64);
1679 store
1680 .put_object(
1681 "b",
1682 "k",
1683 Some(*vid),
1684 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1685 &m,
1686 )
1687 .unwrap();
1688 }
1689 let loaded = store.load().unwrap();
1690 let snap = loaded.buckets.get("b").unwrap();
1691 let current = snap.objects.get("k").expect("current object promoted");
1692 assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1693 }
1694
1695 #[test]
1696 fn versioned_load_trailing_delete_marker_hides_current() {
1697 let tmp = TempDir::new().unwrap();
1698 let store = new_store(&tmp);
1699 store
1700 .put_bucket_meta(
1701 "b",
1702 &BucketMeta {
1703 name: "b".to_string(),
1704 versioning: Some("Enabled".to_string()),
1705 ..Default::default()
1706 },
1707 )
1708 .unwrap();
1709 let base = chrono::Utc::now();
1710 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1711 .iter()
1712 .enumerate()
1713 {
1714 let mut m = sample_meta("k", body.len() as u64);
1715 m.version_id = Some((*vid).to_string());
1716 m.last_modified = base + chrono::Duration::seconds(i as i64);
1717 store
1718 .put_object(
1719 "b",
1720 "k",
1721 Some(*vid),
1722 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1723 &m,
1724 )
1725 .unwrap();
1726 }
1727 let mut dm = sample_meta("k", 0);
1729 dm.version_id = Some("dm1".to_string());
1730 dm.is_delete_marker = true;
1731 dm.last_modified = base + chrono::Duration::seconds(10);
1732 store
1733 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1734 .unwrap();
1735 let loaded = store.load().unwrap();
1736 let snap = loaded.buckets.get("b").unwrap();
1737 assert!(
1738 !snap.objects.contains_key("k"),
1739 "trailing delete marker must hide current object",
1740 );
1741 assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1742 }
1743
1744 #[test]
1745 fn legacy_null_object_overridden_by_newer_versions() {
1746 let tmp = TempDir::new().unwrap();
1749 let store = new_store(&tmp);
1750 store
1751 .put_bucket_meta(
1752 "b",
1753 &BucketMeta {
1754 name: "b".to_string(),
1755 ..Default::default()
1756 },
1757 )
1758 .unwrap();
1759 let base = chrono::Utc::now();
1760 let mut null_meta = sample_meta("k", 3);
1761 null_meta.last_modified = base;
1762 store
1763 .put_object(
1764 "b",
1765 "k",
1766 None,
1767 BodySource::Bytes(Bytes::from_static(b"old")),
1768 &null_meta,
1769 )
1770 .unwrap();
1771 store
1773 .put_bucket_meta(
1774 "b",
1775 &BucketMeta {
1776 name: "b".to_string(),
1777 versioning: Some("Enabled".to_string()),
1778 ..Default::default()
1779 },
1780 )
1781 .unwrap();
1782 let mut v1 = sample_meta("k", 3);
1783 v1.version_id = Some("v1".to_string());
1784 v1.last_modified = base + chrono::Duration::seconds(1);
1785 store
1786 .put_object(
1787 "b",
1788 "k",
1789 Some("v1"),
1790 BodySource::Bytes(Bytes::from_static(b"new")),
1791 &v1,
1792 )
1793 .unwrap();
1794 let mut v2 = sample_meta("k", 5);
1795 v2.version_id = Some("v2".to_string());
1796 v2.last_modified = base + chrono::Duration::seconds(2);
1797 store
1798 .put_object(
1799 "b",
1800 "k",
1801 Some("v2"),
1802 BodySource::Bytes(Bytes::from_static(b"newer")),
1803 &v2,
1804 )
1805 .unwrap();
1806 let loaded = store.load().unwrap();
1807 let snap = loaded.buckets.get("b").unwrap();
1808 let current = snap
1809 .objects
1810 .get("k")
1811 .expect("latest live version must override stale null");
1812 assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1813 assert_eq!(current.meta.size, 5);
1814 }
1815
1816 #[test]
1817 fn legacy_null_hidden_by_trailing_delete_marker() {
1818 let tmp = TempDir::new().unwrap();
1819 let store = new_store(&tmp);
1820 store
1821 .put_bucket_meta(
1822 "b",
1823 &BucketMeta {
1824 name: "b".to_string(),
1825 ..Default::default()
1826 },
1827 )
1828 .unwrap();
1829 let base = chrono::Utc::now();
1830 let mut null_meta = sample_meta("k", 3);
1831 null_meta.last_modified = base;
1832 store
1833 .put_object(
1834 "b",
1835 "k",
1836 None,
1837 BodySource::Bytes(Bytes::from_static(b"old")),
1838 &null_meta,
1839 )
1840 .unwrap();
1841 store
1842 .put_bucket_meta(
1843 "b",
1844 &BucketMeta {
1845 name: "b".to_string(),
1846 versioning: Some("Enabled".to_string()),
1847 ..Default::default()
1848 },
1849 )
1850 .unwrap();
1851 let mut v1 = sample_meta("k", 3);
1852 v1.version_id = Some("v1".to_string());
1853 v1.last_modified = base + chrono::Duration::seconds(1);
1854 store
1855 .put_object(
1856 "b",
1857 "k",
1858 Some("v1"),
1859 BodySource::Bytes(Bytes::from_static(b"new")),
1860 &v1,
1861 )
1862 .unwrap();
1863 let mut dm = sample_meta("k", 0);
1864 dm.version_id = Some("dm1".to_string());
1865 dm.is_delete_marker = true;
1866 dm.last_modified = base + chrono::Duration::seconds(2);
1867 store
1868 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1869 .unwrap();
1870 let loaded = store.load().unwrap();
1871 let snap = loaded.buckets.get("b").unwrap();
1872 assert!(
1873 !snap.objects.contains_key("k"),
1874 "trailing delete marker must hide even a legacy null object",
1875 );
1876 }
1877
1878 #[test]
1879 fn delete_marker_roundtrip_no_body_file() {
1880 let tmp = TempDir::new().unwrap();
1881 let store = new_store(&tmp);
1882 store
1883 .put_bucket_meta(
1884 "b",
1885 &BucketMeta {
1886 name: "b".to_string(),
1887 versioning: Some("Enabled".to_string()),
1888 ..Default::default()
1889 },
1890 )
1891 .unwrap();
1892 let mut m = sample_meta("k", 0);
1893 m.version_id = Some("dm1".to_string());
1894 m.is_delete_marker = true;
1895 store
1896 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1897 .unwrap();
1898 let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1900 assert!(!bin.exists(), "delete marker must not have a .bin file");
1901 assert!(toml_path.exists());
1902
1903 let loaded = store.load().unwrap();
1904 let versions = loaded
1905 .buckets
1906 .get("b")
1907 .unwrap()
1908 .object_versions
1909 .get("k")
1910 .unwrap();
1911 assert_eq!(versions.len(), 1);
1912 assert!(versions[0].meta.is_delete_marker);
1913 match &versions[0].body {
1914 BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1915 _ => panic!("delete marker body should be empty Memory"),
1916 }
1917 }
1918
1919 #[test]
1920 fn mixed_nonversioned_and_versioned_buckets() {
1921 let tmp = TempDir::new().unwrap();
1922 let store = new_store(&tmp);
1923 store
1924 .put_bucket_meta(
1925 "a",
1926 &BucketMeta {
1927 name: "a".to_string(),
1928 ..Default::default()
1929 },
1930 )
1931 .unwrap();
1932 store
1933 .put_bucket_meta(
1934 "b",
1935 &BucketMeta {
1936 name: "b".to_string(),
1937 versioning: Some("Enabled".to_string()),
1938 ..Default::default()
1939 },
1940 )
1941 .unwrap();
1942 let ma = sample_meta("only", 3);
1943 store
1944 .put_object(
1945 "a",
1946 "only",
1947 None,
1948 BodySource::Bytes(Bytes::from_static(b"aaa")),
1949 &ma,
1950 )
1951 .unwrap();
1952 let base = chrono::Utc::now();
1953 for (i, vid) in ["v1", "v2"].iter().enumerate() {
1954 let mut m = sample_meta("twice", 2);
1955 m.version_id = Some((*vid).to_string());
1956 m.last_modified = base + chrono::Duration::seconds(i as i64);
1957 store
1958 .put_object(
1959 "b",
1960 "twice",
1961 Some(*vid),
1962 BodySource::Bytes(Bytes::from_static(b"xx")),
1963 &m,
1964 )
1965 .unwrap();
1966 }
1967 let loaded = store.load().unwrap();
1968 assert_eq!(loaded.buckets.len(), 2);
1969 let a = loaded.buckets.get("a").unwrap();
1970 assert_eq!(a.objects.len(), 1);
1971 assert!(a.object_versions.is_empty());
1972 let b = loaded.buckets.get("b").unwrap();
1973 assert_eq!(
1976 b.objects.get("twice").unwrap().meta.version_id.as_deref(),
1977 Some("v2")
1978 );
1979 assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
1980 }
1981
1982 fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
1983 MpuInit {
1984 upload_id: upload_id.to_string(),
1985 key: key.to_string(),
1986 initiated: chrono::Utc::now(),
1987 content_type: "application/octet-stream".to_string(),
1988 storage_class: "STANDARD".to_string(),
1989 ..Default::default()
1990 }
1991 }
1992
1993 #[test]
1994 fn mpu_create_then_load_empty_parts() {
1995 let tmp = TempDir::new().unwrap();
1996 let store = new_store(&tmp);
1997 store
1998 .put_bucket_meta(
1999 "b",
2000 &BucketMeta {
2001 name: "b".to_string(),
2002 ..Default::default()
2003 },
2004 )
2005 .unwrap();
2006 let init = sample_mpu_init("up1", "k1");
2007 store.mpu_create("b", "up1", &init).unwrap();
2008 let loaded = store.load().unwrap();
2009 let snap = loaded.buckets.get("b").unwrap();
2010 let m = snap.multipart_uploads.get("up1").expect("upload present");
2011 assert_eq!(m.init.upload_id, "up1");
2012 assert_eq!(m.init.key, "k1");
2013 assert!(m.parts.is_empty());
2014 }
2015
2016 #[test]
2017 fn mpu_put_part_then_load_three_parts() {
2018 let tmp = TempDir::new().unwrap();
2019 let store = new_store(&tmp);
2020 store
2021 .put_bucket_meta(
2022 "b",
2023 &BucketMeta {
2024 name: "b".to_string(),
2025 ..Default::default()
2026 },
2027 )
2028 .unwrap();
2029 store
2030 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2031 .unwrap();
2032 let bodies = [
2033 Bytes::from_static(b"part-one"),
2034 Bytes::from_static(b"part-two-longer"),
2035 Bytes::from_static(b"p3"),
2036 ];
2037 for (i, body) in bodies.iter().enumerate() {
2038 let n = (i + 1) as u32;
2039 store
2040 .mpu_put_part(
2041 "b",
2042 "up",
2043 n,
2044 BodySource::Bytes(body.clone()),
2045 &format!("et{}", n),
2046 )
2047 .unwrap();
2048 }
2049 let loaded = store.load().unwrap();
2050 let snap = loaded.buckets.get("b").unwrap();
2051 let m = snap.multipart_uploads.get("up").unwrap();
2052 assert_eq!(m.parts.len(), 3);
2053 for (i, body) in bodies.iter().enumerate() {
2054 let n = (i + 1) as u32;
2055 let part = m.parts.get(&n).unwrap();
2056 assert_eq!(part.meta.part_number, n);
2057 assert_eq!(part.meta.size, body.len() as u64);
2058 assert_eq!(part.meta.etag, format!("et{}", n));
2059 match &part.body {
2060 BodyRef::Disk { path, size, .. } => {
2061 assert_eq!(*size, body.len() as u64);
2062 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2063 }
2064 _ => panic!("expected Disk"),
2065 }
2066 }
2067 }
2068
2069 #[test]
2070 fn mpu_abort_removes_upload() {
2071 let tmp = TempDir::new().unwrap();
2072 let store = new_store(&tmp);
2073 store
2074 .put_bucket_meta(
2075 "b",
2076 &BucketMeta {
2077 name: "b".to_string(),
2078 ..Default::default()
2079 },
2080 )
2081 .unwrap();
2082 store
2083 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2084 .unwrap();
2085 store
2086 .mpu_put_part(
2087 "b",
2088 "up",
2089 1,
2090 BodySource::Bytes(Bytes::from_static(b"x")),
2091 "e",
2092 )
2093 .unwrap();
2094 store.mpu_abort("b", "up").unwrap();
2095 assert!(!store.mpu_dir("b", "up").exists());
2096 store.mpu_abort("b", "up").unwrap();
2098 let loaded = store.load().unwrap();
2099 let snap = loaded.buckets.get("b").unwrap();
2100 assert!(snap.multipart_uploads.is_empty());
2101 }
2102
2103 #[test]
2104 fn mpu_complete_single_part_fast_path() {
2105 let tmp = TempDir::new().unwrap();
2106 let store = new_store(&tmp);
2107 store
2108 .put_bucket_meta(
2109 "b",
2110 &BucketMeta {
2111 name: "b".to_string(),
2112 ..Default::default()
2113 },
2114 )
2115 .unwrap();
2116 store
2117 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2118 .unwrap();
2119 let body = Bytes::from_static(b"only-part-bytes");
2120 store
2121 .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2122 .unwrap();
2123 let meta = sample_meta("k", body.len() as u64);
2124 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2125 match &body_ref {
2126 BodyRef::Disk { path, size, .. } => {
2127 assert_eq!(*size, body.len() as u64);
2128 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2129 }
2130 _ => panic!("expected Disk"),
2131 }
2132 assert!(!store.mpu_dir("b", "up").exists());
2133 }
2134
2135 #[test]
2136 fn mpu_complete_multi_part_concat() {
2137 let tmp = TempDir::new().unwrap();
2138 let store = new_store(&tmp);
2139 store
2140 .put_bucket_meta(
2141 "b",
2142 &BucketMeta {
2143 name: "b".to_string(),
2144 ..Default::default()
2145 },
2146 )
2147 .unwrap();
2148 store
2149 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2150 .unwrap();
2151 let p1 = Bytes::from_static(b"AAAA");
2152 let p2 = Bytes::from_static(b"BBBBBB");
2153 let p3 = Bytes::from_static(b"CC");
2154 for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2155 store
2156 .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2157 .unwrap();
2158 }
2159 let mut expected = Vec::new();
2160 expected.extend_from_slice(&p1);
2161 expected.extend_from_slice(&p2);
2162 expected.extend_from_slice(&p3);
2163 let meta = sample_meta("k", expected.len() as u64);
2164 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2165 let path = match body_ref {
2166 BodyRef::Disk { path, size, .. } => {
2167 assert_eq!(size, expected.len() as u64);
2168 path
2169 }
2170 _ => panic!("expected Disk"),
2171 };
2172 assert_eq!(std::fs::read(&path).unwrap(), expected);
2173 assert!(!store.mpu_dir("b", "up").exists());
2174 }
2175
2176 #[test]
2177 fn mpu_complete_large_streaming_via_file_source() {
2178 let tmp = TempDir::new().unwrap();
2179 let store = new_store(&tmp);
2180 store
2181 .put_bucket_meta(
2182 "b",
2183 &BucketMeta {
2184 name: "b".to_string(),
2185 ..Default::default()
2186 },
2187 )
2188 .unwrap();
2189 store
2190 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2191 .unwrap();
2192
2193 const PART_SIZE: usize = 1024 * 1024;
2196 let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2197 let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2198 for (i, byte) in patterns.iter().enumerate() {
2199 let src = tmp.path().join(format!("src-{}.bin", i + 1));
2200 let data = vec![*byte; PART_SIZE];
2201 std::fs::write(&src, &data).unwrap();
2202 expected.extend_from_slice(&data);
2203 store
2204 .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2205 .unwrap();
2206 }
2207 let meta = sample_meta("k", expected.len() as u64);
2208 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2209 let path = match body_ref {
2210 BodyRef::Disk { path, size, .. } => {
2211 assert_eq!(size, expected.len() as u64);
2212 path
2213 }
2214 _ => panic!("expected Disk"),
2215 };
2216 let actual = std::fs::read(&path).unwrap();
2217 assert_eq!(actual.len(), expected.len());
2218 assert_eq!(actual, expected);
2219 assert!(!store.mpu_dir("b", "up").exists());
2220 }
2221
2222 #[test]
2223 fn mpu_resumable_across_load() {
2224 let tmp = TempDir::new().unwrap();
2225 let store = new_store(&tmp);
2226 store
2227 .put_bucket_meta(
2228 "b",
2229 &BucketMeta {
2230 name: "b".to_string(),
2231 ..Default::default()
2232 },
2233 )
2234 .unwrap();
2235 store
2236 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2237 .unwrap();
2238 let p1 = Bytes::from_static(b"hello-");
2239 let p2 = Bytes::from_static(b"world-");
2240 store
2241 .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2242 .unwrap();
2243 store
2244 .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2245 .unwrap();
2246
2247 let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2249 let loaded = store2.load().unwrap();
2250 let snap = loaded.buckets.get("b").unwrap();
2251 let m = snap.multipart_uploads.get("up").unwrap();
2252 assert_eq!(m.parts.len(), 2);
2253 assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2254 assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2255
2256 let p3 = Bytes::from_static(b"again!");
2258 store2
2259 .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2260 .unwrap();
2261 let mut expected = Vec::new();
2262 expected.extend_from_slice(&p1);
2263 expected.extend_from_slice(&p2);
2264 expected.extend_from_slice(&p3);
2265 let meta = sample_meta("k", expected.len() as u64);
2266 let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2267 let path = match body_ref {
2268 BodyRef::Disk { path, .. } => path,
2269 _ => panic!(),
2270 };
2271 assert_eq!(std::fs::read(&path).unwrap(), expected);
2272 assert!(!store2.mpu_dir("b", "up").exists());
2273 }
2274
2275 #[test]
2276 fn tags_snapshot_roundtrip_via_store() {
2277 let tmp = TempDir::new().unwrap();
2278 let store = new_store(&tmp);
2279 store
2280 .put_bucket_meta(
2281 "b",
2282 &BucketMeta {
2283 name: "b".to_string(),
2284 ..Default::default()
2285 },
2286 )
2287 .unwrap();
2288 let mut tags = HashMap::new();
2289 tags.insert("env".to_string(), "prod".to_string());
2290 tags.insert("team".to_string(), "s3".to_string());
2291 let snap = TagsSnapshot { tags: tags.clone() };
2292 let payload = toml::to_string(&snap).unwrap();
2293 store
2294 .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2295 .unwrap();
2296 let loaded = store.load().unwrap();
2297 let text = loaded
2298 .buckets
2299 .get("b")
2300 .unwrap()
2301 .subresources
2302 .get("tags.toml")
2303 .cloned()
2304 .unwrap();
2305 let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2306 assert_eq!(decoded.tags, tags);
2307 }
2308
2309 #[test]
2310 fn acl_snapshot_roundtrip_via_store() {
2311 let tmp = TempDir::new().unwrap();
2312 let store = new_store(&tmp);
2313 store
2314 .put_bucket_meta(
2315 "b",
2316 &BucketMeta {
2317 name: "b".to_string(),
2318 ..Default::default()
2319 },
2320 )
2321 .unwrap();
2322 let snap = AclSnapshot {
2323 owner_id: "owner-abc".to_string(),
2324 grants: vec![
2325 AclGrantSnapshot {
2326 grantee_type: "CanonicalUser".to_string(),
2327 grantee_id: Some("owner-abc".to_string()),
2328 grantee_display_name: Some("owner".to_string()),
2329 grantee_uri: None,
2330 permission: "FULL_CONTROL".to_string(),
2331 },
2332 AclGrantSnapshot {
2333 grantee_type: "Group".to_string(),
2334 grantee_id: None,
2335 grantee_display_name: None,
2336 grantee_uri: Some(
2337 "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2338 ),
2339 permission: "READ".to_string(),
2340 },
2341 ],
2342 };
2343 let payload = toml::to_string(&snap).unwrap();
2344 store
2345 .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2346 .unwrap();
2347 let loaded = store.load().unwrap();
2348 let text = loaded
2349 .buckets
2350 .get("b")
2351 .unwrap()
2352 .subresources
2353 .get("acl.toml")
2354 .cloned()
2355 .unwrap();
2356 let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2357 assert_eq!(decoded.owner_id, "owner-abc");
2358 assert_eq!(decoded.grants.len(), 2);
2359 assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2360 assert_eq!(decoded.grants[1].grantee_type, "Group");
2361 }
2362
2363 #[test]
2364 fn inventory_snapshot_roundtrip_via_store() {
2365 let tmp = TempDir::new().unwrap();
2366 let store = new_store(&tmp);
2367 store
2368 .put_bucket_meta(
2369 "b",
2370 &BucketMeta {
2371 name: "b".to_string(),
2372 ..Default::default()
2373 },
2374 )
2375 .unwrap();
2376 let mut configs = HashMap::new();
2377 configs.insert(
2378 "inv-1".to_string(),
2379 "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2380 );
2381 configs.insert(
2382 "inv-2".to_string(),
2383 "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2384 );
2385 let snap = InventorySnapshot {
2386 configs: configs.clone(),
2387 };
2388 let payload = toml::to_string(&snap).unwrap();
2389 store
2390 .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2391 .unwrap();
2392 let loaded = store.load().unwrap();
2393 let text = loaded
2394 .buckets
2395 .get("b")
2396 .unwrap()
2397 .subresources
2398 .get("inventory.toml")
2399 .cloned()
2400 .unwrap();
2401 let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2402 assert_eq!(decoded.configs, configs);
2403 }
2404
2405 #[test]
2406 fn legacy_versioning_file_is_read() {
2407 let tmp = TempDir::new().unwrap();
2408 let store = new_store(&tmp);
2409 store
2411 .put_bucket_meta(
2412 "b",
2413 &BucketMeta {
2414 name: "b".to_string(),
2415 ..Default::default()
2416 },
2417 )
2418 .unwrap();
2419 let path = store.bucket_dir("b").join("versioning.toml");
2421 std::fs::write(&path, "Enabled").unwrap();
2422 let loaded = store.load().unwrap();
2423 let snap = loaded.buckets.get("b").unwrap();
2424 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2425 }
2426}