1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11 #[serde(default)]
12 pub account_id: String,
13 #[serde(default)]
14 pub region: String,
15 #[serde(default)]
16 pub buckets: BTreeMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21 pub meta: BucketMeta,
22 #[serde(default)]
23 pub objects: BTreeMap<String, LoadedObject>,
24 #[serde(default)]
25 pub object_versions: BTreeMap<String, Vec<LoadedObject>>,
26 #[serde(default)]
27 pub subresources: BTreeMap<String, String>,
28 #[serde(default)]
29 pub multipart_uploads: BTreeMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34 pub init: MpuInit,
35 #[serde(default)]
36 pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41 pub meta: UploadPartMeta,
42 #[serde(default)]
43 pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48 pub meta: ObjectMeta,
49 #[serde(default)]
50 pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55 #[serde(default)]
56 pub name: String,
57 #[serde(default = "default_time")]
58 pub creation_date: DateTime<Utc>,
59 #[serde(default)]
60 pub region: String,
61 #[serde(default)]
62 pub versioning: Option<String>,
63 #[serde(default)]
64 pub acl: Option<String>,
65 #[serde(default)]
66 pub acl_owner_id: String,
67 #[serde(default)]
68 pub accelerate_status: Option<String>,
69 #[serde(default)]
70 pub eventbridge_enabled: bool,
71 #[serde(default)]
72 pub lifecycle_transition_default_min_size: Option<String>,
73}
74
75fn default_time() -> DateTime<Utc> {
76 DateTime::<Utc>::MIN_UTC
77}
78
79#[derive(Debug, Default, Clone, Serialize, Deserialize)]
80pub struct AclGrantSnapshot {
81 pub grantee_type: String,
82 #[serde(default)]
83 pub grantee_id: Option<String>,
84 #[serde(default)]
85 pub grantee_display_name: Option<String>,
86 #[serde(default)]
87 pub grantee_uri: Option<String>,
88 pub permission: String,
89}
90
91#[derive(Debug, Default, Clone, Serialize, Deserialize)]
92pub struct TagsSnapshot {
93 #[serde(default)]
94 pub tags: BTreeMap<String, String>,
95}
96
97#[derive(Debug, Default, Clone, Serialize, Deserialize)]
98pub struct AclSnapshot {
99 #[serde(default)]
100 pub owner_id: String,
101 #[serde(default)]
102 pub grants: Vec<AclGrantSnapshot>,
103}
104
105#[derive(Debug, Default, Clone, Serialize, Deserialize)]
106pub struct InventorySnapshot {
107 #[serde(default)]
108 pub configs: BTreeMap<String, String>,
109}
110
111#[derive(Debug, Default, Clone, Serialize, Deserialize)]
112pub struct ObjectMeta {
113 #[serde(default)]
114 pub key: String,
115 #[serde(default)]
116 pub content_type: String,
117 #[serde(default)]
118 pub etag: String,
119 #[serde(default)]
120 pub size: u64,
121 #[serde(default = "default_time")]
122 pub last_modified: DateTime<Utc>,
123 #[serde(default)]
124 pub metadata: BTreeMap<String, String>,
125 #[serde(default)]
126 pub tags: BTreeMap<String, String>,
127 #[serde(default)]
128 pub storage_class: String,
129 #[serde(default)]
130 pub acl_grants: Vec<AclGrantSnapshot>,
131 #[serde(default)]
132 pub acl_owner_id: Option<String>,
133 #[serde(default)]
134 pub parts_count: Option<u32>,
135 #[serde(default)]
136 pub part_sizes: Option<Vec<(u32, u64)>>,
137 #[serde(default)]
138 pub sse_algorithm: Option<String>,
139 #[serde(default)]
140 pub sse_kms_key_id: Option<String>,
141 #[serde(default)]
142 pub bucket_key_enabled: Option<bool>,
143 #[serde(default)]
144 pub version_id: Option<String>,
145 #[serde(default)]
146 pub is_delete_marker: bool,
147 #[serde(default)]
148 pub restore_ongoing: Option<bool>,
149 #[serde(default)]
150 pub restore_expiry: Option<String>,
151 #[serde(default)]
152 pub checksum_algorithm: Option<String>,
153 #[serde(default)]
154 pub checksum_value: Option<String>,
155 #[serde(default)]
156 pub lock_mode: Option<String>,
157 #[serde(default)]
158 pub lock_retain_until: Option<DateTime<Utc>>,
159 #[serde(default)]
160 pub lock_legal_hold: Option<String>,
161 #[serde(default)]
162 pub content_encoding: Option<String>,
163 #[serde(default)]
164 pub website_redirect_location: Option<String>,
165}
166
167#[derive(Debug, Default, Clone, Serialize, Deserialize)]
168pub struct MpuInit {
169 pub upload_id: String,
170 pub key: String,
171 #[serde(default = "default_time")]
172 pub initiated: DateTime<Utc>,
173 #[serde(default)]
174 pub metadata: BTreeMap<String, String>,
175 #[serde(default)]
176 pub content_type: String,
177 #[serde(default)]
178 pub storage_class: String,
179 #[serde(default)]
180 pub sse_algorithm: Option<String>,
181 #[serde(default)]
182 pub sse_kms_key_id: Option<String>,
183 #[serde(default)]
184 pub tagging: Option<String>,
185 #[serde(default)]
186 pub acl_grants: Vec<AclGrantSnapshot>,
187 #[serde(default)]
188 pub checksum_algorithm: Option<String>,
189}
190
191#[derive(Debug, Default, Clone, Serialize, Deserialize)]
192pub struct UploadPartMeta {
193 pub part_number: u32,
194 pub etag: String,
195 pub size: u64,
196 #[serde(default = "default_time")]
197 pub last_modified: DateTime<Utc>,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
201pub enum BucketSubresource {
202 Tags,
203 Lifecycle,
204 Cors,
205 Policy,
206 Notification,
207 Logging,
208 Website,
209 PublicAccessBlock,
210 ObjectLock,
211 Replication,
212 Ownership,
213 Inventory,
214 Encryption,
215 Versioning,
216 Acl,
217 Accelerate,
218 Analytics,
219 IntelligentTiering,
220 Metrics,
221 RequestPayment,
222 Abac,
223 MetadataConfiguration,
224 MetadataTableConfiguration,
225}
226
227pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
228 BucketSubresource::Tags,
229 BucketSubresource::Lifecycle,
230 BucketSubresource::Cors,
231 BucketSubresource::Policy,
232 BucketSubresource::Notification,
233 BucketSubresource::Logging,
234 BucketSubresource::Website,
235 BucketSubresource::PublicAccessBlock,
236 BucketSubresource::ObjectLock,
237 BucketSubresource::Replication,
238 BucketSubresource::Ownership,
239 BucketSubresource::Inventory,
240 BucketSubresource::Encryption,
241 BucketSubresource::Versioning,
242 BucketSubresource::Acl,
243 BucketSubresource::Accelerate,
244 BucketSubresource::Analytics,
245 BucketSubresource::IntelligentTiering,
246 BucketSubresource::Metrics,
247 BucketSubresource::RequestPayment,
248 BucketSubresource::Abac,
249 BucketSubresource::MetadataConfiguration,
250 BucketSubresource::MetadataTableConfiguration,
251];
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub enum BodyRef {
255 #[serde(skip)]
256 Memory(Bytes),
257 Disk {
258 bucket: String,
259 key: String,
260 #[serde(default)]
261 version: Option<String>,
262 path: PathBuf,
263 size: u64,
264 },
265}
266
267impl BodyRef {
268 pub fn size(&self) -> u64 {
269 match self {
270 BodyRef::Memory(b) => b.len() as u64,
271 BodyRef::Disk { size, .. } => *size,
272 }
273 }
274}
275
276impl Default for BodyRef {
277 fn default() -> Self {
278 BodyRef::Memory(Bytes::new())
279 }
280}
281
282#[derive(Debug)]
283pub enum BodySource {
284 Bytes(Bytes),
285 File(PathBuf),
288 FileCopy(PathBuf),
292}
293
294fn read_body_source(body: BodySource) -> StoreResult<Bytes> {
300 match body {
301 BodySource::Bytes(b) => Ok(b),
302 BodySource::File(p) => {
303 let bytes = std::fs::read(&p)?;
304 let _ = std::fs::remove_file(&p);
307 Ok(Bytes::from(bytes))
308 }
309 BodySource::FileCopy(p) => {
310 let bytes = std::fs::read(&p)?;
311 Ok(Bytes::from(bytes))
312 }
313 }
314}
315
316#[derive(Debug, Error)]
317pub enum StoreError {
318 #[error("io error: {0}")]
319 Io(#[from] std::io::Error),
320 #[error("serialization error: {0}")]
321 Serde(String),
322 #[error("not supported by this store")]
323 NotSupported,
324 #[error("{0}")]
325 Other(String),
326}
327
328pub type StoreResult<T> = Result<T, StoreError>;
329
330pub trait S3Store: Send + Sync {
331 fn load(&self) -> StoreResult<S3State>;
332
333 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
334 fn put_bucket_subresource(
335 &self,
336 bucket: &str,
337 kind: BucketSubresource,
338 payload: &str,
339 ) -> StoreResult<()>;
340 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
341 fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
342
343 fn put_object(
344 &self,
345 bucket: &str,
346 key: &str,
347 version: Option<&str>,
348 body: BodySource,
349 meta: &ObjectMeta,
350 ) -> StoreResult<BodyRef>;
351 fn put_object_meta(
352 &self,
353 bucket: &str,
354 key: &str,
355 version: Option<&str>,
356 meta: &ObjectMeta,
357 ) -> StoreResult<()>;
358 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
359 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
360
361 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
362 fn mpu_put_part(
368 &self,
369 bucket: &str,
370 upload_id: &str,
371 part_number: u32,
372 body: BodySource,
373 etag: &str,
374 ) -> StoreResult<BodyRef>;
375 fn spool_dir(&self) -> Option<std::path::PathBuf> {
382 None
383 }
384 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
385 fn mpu_complete(
386 &self,
387 bucket: &str,
388 upload_id: &str,
389 final_key: &str,
390 version: Option<&str>,
391 meta: &ObjectMeta,
392 ) -> StoreResult<BodyRef>;
393}
394
395pub struct MemoryS3Store;
396
397impl MemoryS3Store {
398 pub fn new() -> Self {
399 Self
400 }
401}
402
403impl Default for MemoryS3Store {
404 fn default() -> Self {
405 Self::new()
406 }
407}
408
409impl S3Store for MemoryS3Store {
410 fn load(&self) -> StoreResult<S3State> {
411 Ok(S3State::default())
412 }
413
414 fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
415 Ok(())
416 }
417 fn put_bucket_subresource(
418 &self,
419 _bucket: &str,
420 _kind: BucketSubresource,
421 _payload: &str,
422 ) -> StoreResult<()> {
423 Ok(())
424 }
425 fn delete_bucket_subresource(
426 &self,
427 _bucket: &str,
428 _kind: BucketSubresource,
429 ) -> StoreResult<()> {
430 Ok(())
431 }
432 fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
433 Ok(())
434 }
435
436 fn put_object(
437 &self,
438 _bucket: &str,
439 _key: &str,
440 _version: Option<&str>,
441 body: BodySource,
442 _meta: &ObjectMeta,
443 ) -> StoreResult<BodyRef> {
444 Ok(BodyRef::Memory(read_body_source(body)?))
445 }
446 fn put_object_meta(
447 &self,
448 _bucket: &str,
449 _key: &str,
450 _version: Option<&str>,
451 _meta: &ObjectMeta,
452 ) -> StoreResult<()> {
453 Ok(())
454 }
455 fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
456 Ok(())
457 }
458 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
459 match body {
460 BodyRef::Memory(b) => Ok(b.clone()),
461 BodyRef::Disk { .. } => {
462 panic!("MemoryS3Store cannot open Disk-backed BodyRef")
463 }
464 }
465 }
466
467 fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
468 Ok(())
469 }
470 fn mpu_put_part(
471 &self,
472 _bucket: &str,
473 _upload_id: &str,
474 _part_number: u32,
475 body: BodySource,
476 _etag: &str,
477 ) -> StoreResult<BodyRef> {
478 Ok(BodyRef::Memory(read_body_source(body)?))
481 }
482 fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
483 Ok(())
484 }
485 fn mpu_complete(
486 &self,
487 _bucket: &str,
488 _upload_id: &str,
489 _final_key: &str,
490 _version: Option<&str>,
491 _meta: &ObjectMeta,
492 ) -> StoreResult<BodyRef> {
493 Ok(BodyRef::Memory(Bytes::new()))
494 }
495}
496
497pub struct DiskS3Store {
498 root: PathBuf,
499 cache: std::sync::Arc<crate::cache::BodyCache>,
500}
501
502impl DiskS3Store {
503 pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
504 Self { root, cache }
505 }
506
507 fn buckets_dir(&self) -> PathBuf {
508 self.root.join("buckets")
509 }
510
511 fn bucket_dir(&self, bucket: &str) -> PathBuf {
512 self.buckets_dir()
513 .join(crate::key_escape::escape_key_segment(bucket))
514 }
515
516 fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
517 self.bucket_dir(bucket)
518 .join("objects")
519 .join(crate::key_escape::escape_key_segment(key))
520 }
521
522 fn version_tag(version: Option<&str>) -> String {
523 version.unwrap_or("null").to_string()
524 }
525
526 fn object_paths(
527 &self,
528 bucket: &str,
529 key: &str,
530 version: Option<&str>,
531 ) -> (PathBuf, PathBuf, PathBuf) {
532 let dir = self.object_dir(bucket, key);
533 let tag = Self::version_tag(version);
534 let bin = dir.join(format!("{}.bin", tag));
535 let toml = dir.join(format!("{}.toml", tag));
536 (dir, bin, toml)
537 }
538
539 fn subresource_filename(kind: BucketSubresource) -> &'static str {
540 match kind {
541 BucketSubresource::Tags => "tags.toml",
542 BucketSubresource::Lifecycle => "lifecycle.toml",
543 BucketSubresource::Cors => "cors.toml",
544 BucketSubresource::Policy => "policy.toml",
545 BucketSubresource::Notification => "notification.toml",
546 BucketSubresource::Logging => "logging.toml",
547 BucketSubresource::Website => "website.toml",
548 BucketSubresource::PublicAccessBlock => "public_access_block.toml",
549 BucketSubresource::ObjectLock => "object_lock.toml",
550 BucketSubresource::Replication => "replication.toml",
551 BucketSubresource::Ownership => "ownership.toml",
552 BucketSubresource::Inventory => "inventory.toml",
553 BucketSubresource::Encryption => "encryption.toml",
554 BucketSubresource::Versioning => "versioning.toml",
555 BucketSubresource::Acl => "acl.toml",
556 BucketSubresource::Accelerate => "accelerate.toml",
557 BucketSubresource::Analytics => "analytics.toml",
558 BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
559 BucketSubresource::Metrics => "metrics.toml",
560 BucketSubresource::RequestPayment => "request_payment.toml",
561 BucketSubresource::Abac => "abac.toml",
562 BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
563 BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
564 }
565 }
566
567 fn cleanup_empty(dir: &std::path::Path) {
568 let _ = std::fs::remove_dir(dir);
569 }
570
571 fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
572 self.bucket_dir(bucket)
573 .join("mpu")
574 .join(crate::key_escape::escape_key_segment(upload_id))
575 }
576
577 fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
578 self.mpu_dir(bucket, upload_id).join("parts")
579 }
580
581 fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
582 self.mpu_parts_dir(bucket, upload_id)
583 .join(format!("{}.bin", part_number))
584 }
585
586 fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
587 self.mpu_parts_dir(bucket, upload_id)
588 .join(format!("{}.toml", part_number))
589 }
590
591 fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
592 crate::cache::BodyKey::new(
593 bucket.to_string(),
594 format!("__mpu__/{}", upload_id),
595 Some(format!("part-{}", part_number)),
596 )
597 }
598}
599
600fn io_other(msg: impl Into<String>) -> StoreError {
601 StoreError::Other(msg.into())
602}
603
604impl S3Store for DiskS3Store {
605 fn load(&self) -> StoreResult<S3State> {
606 let mut state = S3State::default();
607 let buckets_dir = self.buckets_dir();
608 if !buckets_dir.exists() {
609 return Ok(state);
610 }
611 for entry in std::fs::read_dir(&buckets_dir)? {
612 let entry = entry?;
613 if !entry.file_type()?.is_dir() {
614 continue;
615 }
616 let bdir = entry.path();
617 let meta_path = bdir.join("meta.toml");
618 if !meta_path.exists() {
619 continue;
620 }
621 let meta_text = std::fs::read_to_string(&meta_path)?;
622 let mut meta: BucketMeta =
623 toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
624 let mut snap = BucketSnapshot {
625 meta: meta.clone(),
626 objects: BTreeMap::new(),
627 object_versions: BTreeMap::new(),
628 subresources: BTreeMap::new(),
629 multipart_uploads: BTreeMap::new(),
630 };
631
632 for kind in ALL_SUBRESOURCES {
633 let fname = Self::subresource_filename(*kind);
634 let path = bdir.join(fname);
635 if path.exists() {
636 let text = std::fs::read_to_string(&path)?;
637 if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none() {
638 let stripped = text.trim();
639 if !stripped.is_empty() {
640 snap.meta.versioning = Some(stripped.to_string());
641 meta.versioning = snap.meta.versioning.clone();
642 }
643 }
644 snap.subresources.insert(fname.to_string(), text);
645 }
646 }
647
648 let objects_root = bdir.join("objects");
649 if objects_root.exists() {
650 for okey_entry in std::fs::read_dir(&objects_root)? {
651 let okey_entry = okey_entry?;
652 if !okey_entry.file_type()?.is_dir() {
653 continue;
654 }
655 let key_dir = okey_entry.path();
656 let mut versioned: Vec<LoadedObject> = Vec::new();
657 let mut key_name: Option<String> = None;
658 for version_entry in std::fs::read_dir(&key_dir)? {
659 let version_entry = version_entry?;
660 let path = version_entry.path();
661 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
662 continue;
663 };
664 if !fname.ends_with(".toml") {
665 continue;
666 }
667 let version_tag = &fname[..fname.len() - 5];
668 let toml_text = std::fs::read_to_string(&path)?;
669 let obj_meta: ObjectMeta = toml::from_str(&toml_text)
670 .map_err(|e| StoreError::Serde(e.to_string()))?;
671 let bin_path = key_dir.join(format!("{}.bin", version_tag));
672 let (body, size) = if obj_meta.is_delete_marker {
673 (BodyRef::Memory(Bytes::new()), 0u64)
674 } else if bin_path.exists() {
675 let sz = std::fs::metadata(&bin_path)?.len();
676 (
677 BodyRef::Disk {
678 bucket: meta.name.clone(),
679 key: obj_meta.key.clone(),
680 version: if version_tag == "null" {
681 None
682 } else {
683 Some(version_tag.to_string())
684 },
685 path: bin_path,
686 size: sz,
687 },
688 sz,
689 )
690 } else {
691 return Err(StoreError::Other(format!(
696 "missing body file: {}",
697 bin_path.display()
698 )));
699 };
700 let _ = size;
701 key_name.get_or_insert_with(|| obj_meta.key.clone());
702 if version_tag == "null" && obj_meta.version_id.is_none() {
703 snap.objects.insert(
704 obj_meta.key.clone(),
705 LoadedObject {
706 meta: obj_meta,
707 body,
708 },
709 );
710 } else {
711 versioned.push(LoadedObject {
712 meta: obj_meta,
713 body,
714 });
715 }
716 }
717 if !versioned.is_empty() {
718 versioned.sort_by_key(|v| v.meta.last_modified);
719 if let Some(key) = key_name {
720 match versioned.last() {
727 Some(newest) if newest.meta.is_delete_marker => {
728 snap.objects.remove(&key);
729 }
730 Some(newest) => {
731 snap.objects.insert(key.clone(), newest.clone());
732 }
733 None => {}
734 }
735 snap.object_versions.insert(key, versioned);
736 }
737 }
738 }
739 }
740
741 let mpu_root = bdir.join("mpu");
742 if mpu_root.exists() {
743 for upload_entry in std::fs::read_dir(&mpu_root)? {
744 let upload_entry = upload_entry?;
745 if !upload_entry.file_type()?.is_dir() {
746 continue;
747 }
748 let upload_dir = upload_entry.path();
749 let init_path = upload_dir.join("init.toml");
750 if !init_path.exists() {
751 continue;
752 }
753 let init_text = std::fs::read_to_string(&init_path)?;
754 let init: MpuInit =
755 toml::from_str(&init_text).map_err(|e| StoreError::Serde(e.to_string()))?;
756 let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
757 let parts_dir = upload_dir.join("parts");
758 if parts_dir.exists() {
759 for part_entry in std::fs::read_dir(&parts_dir)? {
760 let part_entry = part_entry?;
761 let path = part_entry.path();
762 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
763 continue;
764 };
765 if !fname.ends_with(".toml") {
766 continue;
767 }
768 let stem = &fname[..fname.len() - 5];
769 let Ok(part_number) = stem.parse::<u32>() else {
770 continue;
771 };
772 let toml_text = std::fs::read_to_string(&path)?;
773 let part_meta: UploadPartMeta = toml::from_str(&toml_text)
774 .map_err(|e| StoreError::Serde(e.to_string()))?;
775 let bin_path = parts_dir.join(format!("{}.bin", part_number));
776 if !bin_path.exists() {
777 return Err(StoreError::Other(format!(
778 "missing multipart part body file: {}",
779 bin_path.display()
780 )));
781 }
782 let sz = std::fs::metadata(&bin_path)?.len();
783 let body = BodyRef::Disk {
784 bucket: meta.name.clone(),
785 key: format!("__mpu__/{}", init.upload_id),
786 version: Some(format!("part-{}", part_number)),
787 path: bin_path,
788 size: sz,
789 };
790 loaded_parts.insert(
791 part_number,
792 LoadedPart {
793 meta: part_meta,
794 body,
795 },
796 );
797 }
798 }
799 snap.multipart_uploads.insert(
800 init.upload_id.clone(),
801 LoadedMpu {
802 init,
803 parts: loaded_parts,
804 },
805 );
806 }
807 }
808
809 state.buckets.insert(meta.name.clone(), snap);
810 }
811 Ok(state)
812 }
813
814 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
815 let dir = self.bucket_dir(bucket);
816 std::fs::create_dir_all(&dir)?;
817 crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
818 Ok(())
819 }
820
821 fn put_bucket_subresource(
822 &self,
823 bucket: &str,
824 kind: BucketSubresource,
825 payload: &str,
826 ) -> StoreResult<()> {
827 let dir = self.bucket_dir(bucket);
828 std::fs::create_dir_all(&dir)?;
829 let path = dir.join(Self::subresource_filename(kind));
830 crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
831 Ok(())
832 }
833
834 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
835 let path = self
836 .bucket_dir(bucket)
837 .join(Self::subresource_filename(kind));
838 match std::fs::remove_file(&path) {
839 Ok(_) => Ok(()),
840 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
841 Err(e) => Err(e.into()),
842 }
843 }
844
845 fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
846 let dir = self.bucket_dir(bucket);
847 match std::fs::remove_dir_all(&dir) {
848 Ok(_) => Ok(()),
849 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
850 Err(e) => Err(e.into()),
851 }
852 }
853
854 fn put_object(
855 &self,
856 bucket: &str,
857 key: &str,
858 version: Option<&str>,
859 body: BodySource,
860 meta: &ObjectMeta,
861 ) -> StoreResult<BodyRef> {
862 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
863 std::fs::create_dir_all(&dir)?;
864
865 if meta.is_delete_marker {
866 crate::atomic::write_atomic_toml(&toml_path, meta)?;
867 return Ok(BodyRef::Memory(Bytes::new()));
868 }
869
870 let size: u64;
871 let bytes_for_cache: Option<Bytes>;
872 match body {
873 BodySource::Bytes(b) => {
874 size = b.len() as u64;
875 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
876 bytes_for_cache = Some(b);
877 }
878 BodySource::File(src) => {
879 let src_size = std::fs::metadata(&src)?.len();
880 size = src_size;
881 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
882 bytes_for_cache = None;
883 }
884 BodySource::FileCopy(src) => {
885 let src_size = std::fs::metadata(&src)?.len();
886 size = src_size;
887 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
888 bytes_for_cache = None;
889 }
890 }
891
892 crate::atomic::write_atomic_toml(&toml_path, meta)?;
893
894 let body_key = crate::cache::BodyKey::new(
895 bucket.to_string(),
896 key.to_string(),
897 version.map(|s| s.to_string()),
898 );
899 if let Some(b) = bytes_for_cache {
900 self.cache.insert(body_key, b);
901 } else {
902 self.cache.invalidate(&crate::cache::BodyKey::new(
903 bucket.to_string(),
904 key.to_string(),
905 version.map(|s| s.to_string()),
906 ));
907 }
908
909 Ok(BodyRef::Disk {
910 bucket: bucket.to_string(),
911 key: key.to_string(),
912 version: version.map(|s| s.to_string()),
913 path: bin_path,
914 size,
915 })
916 }
917
918 fn put_object_meta(
919 &self,
920 bucket: &str,
921 key: &str,
922 version: Option<&str>,
923 meta: &ObjectMeta,
924 ) -> StoreResult<()> {
925 let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
926 std::fs::create_dir_all(&dir)?;
927 crate::atomic::write_atomic_toml(&toml_path, meta)?;
928 Ok(())
929 }
930
931 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
932 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
933 for p in [&bin_path, &toml_path] {
934 match std::fs::remove_file(p) {
935 Ok(_) => {}
936 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
937 Err(e) => return Err(e.into()),
938 }
939 }
940 Self::cleanup_empty(&dir);
941
942 self.cache.invalidate(&crate::cache::BodyKey::new(
943 bucket.to_string(),
944 key.to_string(),
945 version.map(|s| s.to_string()),
946 ));
947 Ok(())
948 }
949
950 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
951 match body {
952 BodyRef::Memory(b) => Ok(b.clone()),
953 BodyRef::Disk {
954 bucket,
955 key,
956 version,
957 path,
958 size: _,
959 } => {
960 let body_key =
961 crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
962 if let Some(bytes) = self.cache.get(&body_key) {
963 return Ok(bytes);
964 }
965 let bytes = Bytes::from(std::fs::read(path)?);
966 self.cache.insert(body_key, bytes.clone());
967 Ok(bytes)
968 }
969 }
970 }
971
972 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
973 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
974 std::fs::create_dir_all(&parts_dir)?;
975 let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
976 crate::atomic::write_atomic_toml(&init_path, init)?;
977 Ok(())
978 }
979
980 fn mpu_put_part(
981 &self,
982 bucket: &str,
983 upload_id: &str,
984 part_number: u32,
985 body: BodySource,
986 etag: &str,
987 ) -> StoreResult<BodyRef> {
988 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
989 std::fs::create_dir_all(&parts_dir)?;
990 let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
991 let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
992
993 let size: u64 = match body {
994 BodySource::Bytes(b) => {
995 let n = b.len() as u64;
996 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
997 let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
998 self.cache.insert(cache_key, b);
999 n
1000 }
1001 BodySource::File(src) => {
1002 let n = std::fs::metadata(&src)?.len();
1003 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
1004 self.cache
1005 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1006 n
1007 }
1008 BodySource::FileCopy(src) => {
1009 let n = std::fs::metadata(&src)?.len();
1010 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
1011 self.cache
1012 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1013 n
1014 }
1015 };
1016
1017 let meta = UploadPartMeta {
1018 part_number,
1019 etag: etag.to_string(),
1020 size,
1021 last_modified: Utc::now(),
1022 };
1023 crate::atomic::write_atomic_toml(&toml_path, &meta)?;
1024 Ok(BodyRef::Disk {
1027 bucket: bucket.to_string(),
1028 key: format!("__mpu__/{upload_id}/part-{part_number:05}"),
1029 version: None,
1030 path: bin_path,
1031 size,
1032 })
1033 }
1034
1035 fn spool_dir(&self) -> Option<std::path::PathBuf> {
1036 let dir = self.root.join(".spool");
1037 let _ = std::fs::create_dir_all(&dir);
1041 Some(dir)
1042 }
1043
1044 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
1045 let dir = self.mpu_dir(bucket, upload_id);
1046 if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
1048 for entry in entries.flatten() {
1049 let path = entry.path();
1050 if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
1051 if let Some(stem) = fname.strip_suffix(".bin") {
1052 if let Ok(n) = stem.parse::<u32>() {
1053 self.cache
1054 .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1055 }
1056 }
1057 }
1058 }
1059 }
1060 match std::fs::remove_dir_all(&dir) {
1061 Ok(_) => Ok(()),
1062 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1063 Err(e) => Err(e.into()),
1064 }
1065 }
1066
1067 fn mpu_complete(
1068 &self,
1069 bucket: &str,
1070 upload_id: &str,
1071 final_key: &str,
1072 version: Option<&str>,
1073 meta: &ObjectMeta,
1074 ) -> StoreResult<BodyRef> {
1075 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1076 if !parts_dir.exists() {
1077 return Err(io_other(format!(
1078 "mpu_complete: no parts dir for upload {}",
1079 upload_id
1080 )));
1081 }
1082
1083 let mut part_numbers: Vec<u32> = Vec::new();
1085 for entry in std::fs::read_dir(&parts_dir)? {
1086 let entry = entry?;
1087 let path = entry.path();
1088 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1089 continue;
1090 };
1091 if let Some(stem) = fname.strip_suffix(".toml") {
1092 if let Ok(n) = stem.parse::<u32>() {
1093 if self.mpu_part_bin(bucket, upload_id, n).exists() {
1094 part_numbers.push(n);
1095 }
1096 }
1097 }
1098 }
1099 part_numbers.sort_unstable();
1100 if part_numbers.is_empty() {
1101 return Err(io_other(format!(
1102 "mpu_complete: upload {} has no parts",
1103 upload_id
1104 )));
1105 }
1106
1107 let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1108 std::fs::create_dir_all(&dir)?;
1109
1110 let total_size: u64 = if part_numbers.len() == 1 {
1111 let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1112 let sz = std::fs::metadata(&only)?.len();
1113 match std::fs::rename(&only, &bin_path) {
1114 Ok(_) => {}
1115 Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1116 {
1118 let mut input = std::fs::File::open(&only)?;
1119 let tmp = {
1120 let mut os = bin_path.as_os_str().to_owned();
1121 os.push(".tmp");
1122 PathBuf::from(os)
1123 };
1124 let mut out = std::fs::OpenOptions::new()
1125 .write(true)
1126 .create(true)
1127 .truncate(true)
1128 .open(&tmp)?;
1129 std::io::copy(&mut input, &mut out)?;
1130 out.sync_all()?;
1131 std::fs::rename(&tmp, &bin_path)?;
1132 }
1133 let _ = std::fs::remove_file(&only);
1134 }
1135 Err(e) => return Err(e.into()),
1136 }
1137 sz
1138 } else {
1139 let tmp = {
1140 let mut os = bin_path.as_os_str().to_owned();
1141 os.push(".tmp");
1142 PathBuf::from(os)
1143 };
1144 let mut total: u64 = 0;
1145 {
1146 let mut out = std::fs::OpenOptions::new()
1147 .write(true)
1148 .create(true)
1149 .truncate(true)
1150 .open(&tmp)?;
1151 for n in &part_numbers {
1152 let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1153 let mut input = std::fs::File::open(&part_path)?;
1154 let copied = std::io::copy(&mut input, &mut out)?;
1155 total += copied;
1156 }
1157 out.sync_all()?;
1158 }
1159 std::fs::rename(&tmp, &bin_path)?;
1160 if let Some(parent) = bin_path.parent() {
1161 if let Ok(dir_handle) = std::fs::File::open(parent) {
1162 let _ = dir_handle.sync_all();
1163 }
1164 }
1165 total
1166 };
1167
1168 crate::atomic::write_atomic_toml(&toml_path, meta)?;
1169
1170 for n in &part_numbers {
1172 self.cache
1173 .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1174 }
1175 let mpu_dir = self.mpu_dir(bucket, upload_id);
1176 let _ = std::fs::remove_dir_all(&mpu_dir);
1177
1178 self.cache.invalidate(&crate::cache::BodyKey::new(
1183 bucket.to_string(),
1184 final_key.to_string(),
1185 version.map(|s| s.to_string()),
1186 ));
1187
1188 Ok(BodyRef::Disk {
1189 bucket: bucket.to_string(),
1190 key: final_key.to_string(),
1191 version: version.map(|s| s.to_string()),
1192 path: bin_path,
1193 size: total_size,
1194 })
1195 }
1196}
1197
1198fn libc_exdev() -> i32 {
1199 18
1200}
1201
1202#[cfg(test)]
1203mod disk_tests {
1204 use super::*;
1205 use std::sync::Arc;
1206 use tempfile::TempDir;
1207
1208 fn new_store(tmp: &TempDir) -> DiskS3Store {
1209 let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1210 DiskS3Store::new(tmp.path().to_path_buf(), cache)
1211 }
1212
1213 fn new_store_with_cache(
1214 tmp: &TempDir,
1215 cap: u64,
1216 ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1217 let cache = Arc::new(crate::cache::BodyCache::new(cap));
1218 (
1219 DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1220 cache,
1221 )
1222 }
1223
1224 fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1225 ObjectMeta {
1226 key: key.to_string(),
1227 content_type: "application/octet-stream".to_string(),
1228 etag: "etag".to_string(),
1229 size,
1230 ..Default::default()
1231 }
1232 }
1233
1234 #[test]
1235 fn put_bucket_meta_roundtrip() {
1236 let tmp = TempDir::new().unwrap();
1237 let store = new_store(&tmp);
1238 let meta = BucketMeta {
1239 name: "b1".to_string(),
1240 region: "us-east-1".to_string(),
1241 versioning: Some("Enabled".to_string()),
1242 ..Default::default()
1243 };
1244 store.put_bucket_meta("b1", &meta).unwrap();
1245 let loaded = store.load().unwrap();
1246 let snap = loaded.buckets.get("b1").unwrap();
1247 assert_eq!(snap.meta.name, "b1");
1248 assert_eq!(snap.meta.region, "us-east-1");
1249 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1250 }
1251
1252 #[test]
1253 fn put_bucket_subresource_each_variant_writes_file() {
1254 let tmp = TempDir::new().unwrap();
1255 let store = new_store(&tmp);
1256 store
1257 .put_bucket_meta(
1258 "b",
1259 &BucketMeta {
1260 name: "b".to_string(),
1261 ..Default::default()
1262 },
1263 )
1264 .unwrap();
1265 let variants = [
1266 BucketSubresource::Tags,
1267 BucketSubresource::Lifecycle,
1268 BucketSubresource::Cors,
1269 BucketSubresource::Policy,
1270 BucketSubresource::Notification,
1271 BucketSubresource::Logging,
1272 BucketSubresource::Website,
1273 BucketSubresource::PublicAccessBlock,
1274 BucketSubresource::ObjectLock,
1275 BucketSubresource::Replication,
1276 BucketSubresource::Ownership,
1277 BucketSubresource::Inventory,
1278 BucketSubresource::Encryption,
1279 BucketSubresource::Versioning,
1280 BucketSubresource::Acl,
1281 BucketSubresource::Accelerate,
1282 ];
1283 for v in variants {
1284 store
1285 .put_bucket_subresource("b", v, "payload=true")
1286 .unwrap();
1287 let file = store
1288 .bucket_dir("b")
1289 .join(DiskS3Store::subresource_filename(v));
1290 assert!(file.exists(), "{:?}", v);
1291 assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1292 }
1293 }
1294
1295 #[test]
1296 fn delete_bucket_subresource_removes_file() {
1297 let tmp = TempDir::new().unwrap();
1298 let store = new_store(&tmp);
1299 store
1300 .put_bucket_meta(
1301 "b",
1302 &BucketMeta {
1303 name: "b".to_string(),
1304 ..Default::default()
1305 },
1306 )
1307 .unwrap();
1308 store
1309 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1310 .unwrap();
1311 store
1312 .delete_bucket_subresource("b", BucketSubresource::Tags)
1313 .unwrap();
1314 let file = store.bucket_dir("b").join("tags.toml");
1315 assert!(!file.exists());
1316 store
1318 .delete_bucket_subresource("b", BucketSubresource::Tags)
1319 .unwrap();
1320 }
1321
1322 #[test]
1323 fn put_object_bytes_roundtrip() {
1324 let tmp = TempDir::new().unwrap();
1325 let store = new_store(&tmp);
1326 store
1327 .put_bucket_meta(
1328 "b",
1329 &BucketMeta {
1330 name: "b".to_string(),
1331 ..Default::default()
1332 },
1333 )
1334 .unwrap();
1335 let data = Bytes::from_static(b"hello world");
1336 let meta = sample_meta("k1", data.len() as u64);
1337 let body_ref = store
1338 .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1339 .unwrap();
1340 match &body_ref {
1341 BodyRef::Disk {
1342 bucket,
1343 key,
1344 size,
1345 path,
1346 ..
1347 } => {
1348 assert_eq!(bucket, "b");
1349 assert_eq!(key, "k1");
1350 assert_eq!(*size, data.len() as u64);
1351 assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1352 }
1353 _ => panic!("expected Disk"),
1354 }
1355 let loaded = store.load().unwrap();
1356 let snap = loaded.buckets.get("b").unwrap();
1357 let obj = snap.objects.get("k1").unwrap();
1358 assert_eq!(obj.meta.size, data.len() as u64);
1359 }
1360
1361 #[test]
1362 fn put_object_file_copy_source_leaves_src_in_place() {
1363 let tmp = TempDir::new().unwrap();
1364 let store = new_store(&tmp);
1365 store
1366 .put_bucket_meta(
1367 "b",
1368 &BucketMeta {
1369 name: "b".to_string(),
1370 ..Default::default()
1371 },
1372 )
1373 .unwrap();
1374 let src_dir = TempDir::new().unwrap();
1375 let src = src_dir.path().join("src.bin");
1376 std::fs::write(&src, b"copied-body").unwrap();
1377 let meta = sample_meta("k", 11);
1378 let bref = store
1379 .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1380 .unwrap();
1381 match bref {
1382 BodyRef::Disk { path, size, .. } => {
1383 assert_eq!(size, 11);
1384 assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1385 }
1386 _ => panic!("expected Disk bodyref"),
1387 }
1388 assert!(src.exists(), "source file must not be moved by FileCopy");
1389 assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1390 }
1391
1392 #[test]
1393 fn put_object_file_source() {
1394 let tmp = TempDir::new().unwrap();
1395 let store = new_store(&tmp);
1396 store
1397 .put_bucket_meta(
1398 "b",
1399 &BucketMeta {
1400 name: "b".to_string(),
1401 ..Default::default()
1402 },
1403 )
1404 .unwrap();
1405 let src = tmp.path().join("src.bin");
1406 std::fs::write(&src, b"file-body").unwrap();
1407 let meta = sample_meta("k", 9);
1408 let body_ref = store
1409 .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1410 .unwrap();
1411 let path = match body_ref {
1412 BodyRef::Disk { path, .. } => path,
1413 _ => panic!(),
1414 };
1415 assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1416 }
1417
1418 #[test]
1419 fn put_object_meta_only_keeps_bin() {
1420 let tmp = TempDir::new().unwrap();
1421 let store = new_store(&tmp);
1422 store
1423 .put_bucket_meta(
1424 "b",
1425 &BucketMeta {
1426 name: "b".to_string(),
1427 ..Default::default()
1428 },
1429 )
1430 .unwrap();
1431 let data = Bytes::from_static(b"abc");
1432 let mut meta = sample_meta("k", 3);
1433 store
1434 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1435 .unwrap();
1436 let (_, bin, _) = store.object_paths("b", "k", None);
1437 let before = std::fs::read(&bin).unwrap();
1438 meta.tags.insert("x".to_string(), "y".to_string());
1439 store.put_object_meta("b", "k", None, &meta).unwrap();
1440 assert_eq!(std::fs::read(&bin).unwrap(), before);
1441 let loaded = store.load().unwrap();
1442 let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1443 assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1444 }
1445
1446 #[test]
1447 fn delete_object_cleans_up_files_and_cache() {
1448 let tmp = TempDir::new().unwrap();
1449 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1450 store
1451 .put_bucket_meta(
1452 "b",
1453 &BucketMeta {
1454 name: "b".to_string(),
1455 ..Default::default()
1456 },
1457 )
1458 .unwrap();
1459 let data = Bytes::from_static(b"bye");
1460 let meta = sample_meta("k", 3);
1461 store
1462 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1463 .unwrap();
1464 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1465 assert!(cache.get(&body_key).is_some());
1466 store.delete_object("b", "k", None).unwrap();
1467 let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1468 assert!(!bin.exists());
1469 assert!(!toml_path.exists());
1470 assert!(!dir.exists());
1471 assert!(cache.get(&body_key).is_none());
1472 }
1473
1474 #[test]
1475 fn open_object_body_cache_hit_and_refill() {
1476 let tmp = TempDir::new().unwrap();
1477 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1478 store
1479 .put_bucket_meta(
1480 "b",
1481 &BucketMeta {
1482 name: "b".to_string(),
1483 ..Default::default()
1484 },
1485 )
1486 .unwrap();
1487 let data = Bytes::from_static(b"payload");
1488 let meta = sample_meta("k", data.len() as u64);
1489 let body_ref = store
1490 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1491 .unwrap();
1492 let got = store.open_object_body(&body_ref).unwrap();
1494 assert_eq!(got, data);
1495 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1497 cache.invalidate(&body_key);
1498 assert!(cache.get(&body_key).is_none());
1499 let got = store.open_object_body(&body_ref).unwrap();
1500 assert_eq!(got, data);
1501 assert!(cache.get(&body_key).is_some());
1502 }
1503
1504 #[test]
1505 fn open_object_body_large_bypasses_cache() {
1506 let tmp = TempDir::new().unwrap();
1507 let (store, cache) = new_store_with_cache(&tmp, 1024);
1509 store
1510 .put_bucket_meta(
1511 "b",
1512 &BucketMeta {
1513 name: "b".to_string(),
1514 ..Default::default()
1515 },
1516 )
1517 .unwrap();
1518 let data = Bytes::from(vec![7u8; 800]);
1519 let meta = sample_meta("big", 800);
1520 let body_ref = store
1521 .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1522 .unwrap();
1523 let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1524 assert!(cache.get(&body_key).is_none());
1525 let got = store.open_object_body(&body_ref).unwrap();
1526 assert_eq!(got, data);
1527 assert!(cache.get(&body_key).is_none());
1529 }
1530
1531 #[test]
1532 fn load_empty_dir() {
1533 let tmp = TempDir::new().unwrap();
1534 let store = new_store(&tmp);
1535 let s = store.load().unwrap();
1536 assert!(s.buckets.is_empty());
1537 }
1538
1539 #[test]
1540 fn load_skips_mpu_without_init() {
1541 let tmp = TempDir::new().unwrap();
1542 let store = new_store(&tmp);
1543 store
1544 .put_bucket_meta(
1545 "b",
1546 &BucketMeta {
1547 name: "b".to_string(),
1548 ..Default::default()
1549 },
1550 )
1551 .unwrap();
1552 let data = Bytes::from_static(b"x");
1553 let meta = sample_meta("k", 1);
1554 store
1555 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1556 .unwrap();
1557 let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1559 std::fs::create_dir_all(&mpu).unwrap();
1560
1561 let loaded = store.load().unwrap();
1562 let snap = loaded.buckets.get("b").unwrap();
1563 assert_eq!(snap.objects.len(), 1);
1564 assert!(snap.multipart_uploads.is_empty());
1565 }
1566
1567 #[test]
1568 fn load_reads_bucket_subresources() {
1569 let tmp = TempDir::new().unwrap();
1570 let store = new_store(&tmp);
1571 store
1572 .put_bucket_meta(
1573 "b",
1574 &BucketMeta {
1575 name: "b".to_string(),
1576 ..Default::default()
1577 },
1578 )
1579 .unwrap();
1580 store
1581 .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1582 .unwrap();
1583 store
1584 .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1585 .unwrap();
1586 store
1587 .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1588 .unwrap();
1589 store
1590 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1591 .unwrap();
1592 let loaded = store.load().unwrap();
1593 let snap = loaded.buckets.get("b").unwrap();
1594 assert_eq!(
1595 snap.subresources.get("lifecycle.toml").map(String::as_str),
1596 Some("<Lifecycle/>"),
1597 );
1598 assert_eq!(
1599 snap.subresources.get("cors.toml").map(String::as_str),
1600 Some("<Cors/>"),
1601 );
1602 assert_eq!(
1603 snap.subresources.get("policy.toml").map(String::as_str),
1604 Some("{}"),
1605 );
1606 assert_eq!(
1607 snap.subresources.get("tags.toml").map(String::as_str),
1608 Some("x=1"),
1609 );
1610 }
1611
1612 #[test]
1613 fn versioned_put_load_roundtrip() {
1614 let tmp = TempDir::new().unwrap();
1615 let store = new_store(&tmp);
1616 store
1617 .put_bucket_meta(
1618 "b",
1619 &BucketMeta {
1620 name: "b".to_string(),
1621 versioning: Some("Enabled".to_string()),
1622 ..Default::default()
1623 },
1624 )
1625 .unwrap();
1626 let base = chrono::Utc::now();
1627 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1628 .iter()
1629 .enumerate()
1630 {
1631 let mut m = sample_meta("k", body.len() as u64);
1632 m.version_id = Some((*vid).to_string());
1633 m.last_modified = base + chrono::Duration::seconds(i as i64);
1634 store
1635 .put_object(
1636 "b",
1637 "k",
1638 Some(*vid),
1639 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1640 &m,
1641 )
1642 .unwrap();
1643 }
1644 let loaded = store.load().unwrap();
1645 let snap = loaded.buckets.get("b").unwrap();
1646 let versions = snap.object_versions.get("k").expect("versions present");
1647 assert_eq!(versions.len(), 3);
1648 assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1649 assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1650 assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1651 for v in versions {
1652 match &v.body {
1653 BodyRef::Disk { size, .. } => assert!(*size > 0),
1654 _ => panic!("expected Disk body"),
1655 }
1656 }
1657 }
1658
1659 #[test]
1660 fn versioned_load_promotes_latest_live_to_snap_objects() {
1661 let tmp = TempDir::new().unwrap();
1662 let store = new_store(&tmp);
1663 store
1664 .put_bucket_meta(
1665 "b",
1666 &BucketMeta {
1667 name: "b".to_string(),
1668 versioning: Some("Enabled".to_string()),
1669 ..Default::default()
1670 },
1671 )
1672 .unwrap();
1673 let base = chrono::Utc::now();
1674 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1675 .iter()
1676 .enumerate()
1677 {
1678 let mut m = sample_meta("k", body.len() as u64);
1679 m.version_id = Some((*vid).to_string());
1680 m.last_modified = base + chrono::Duration::seconds(i as i64);
1681 store
1682 .put_object(
1683 "b",
1684 "k",
1685 Some(*vid),
1686 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1687 &m,
1688 )
1689 .unwrap();
1690 }
1691 let loaded = store.load().unwrap();
1692 let snap = loaded.buckets.get("b").unwrap();
1693 let current = snap.objects.get("k").expect("current object promoted");
1694 assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1695 }
1696
1697 #[test]
1698 fn versioned_load_trailing_delete_marker_hides_current() {
1699 let tmp = TempDir::new().unwrap();
1700 let store = new_store(&tmp);
1701 store
1702 .put_bucket_meta(
1703 "b",
1704 &BucketMeta {
1705 name: "b".to_string(),
1706 versioning: Some("Enabled".to_string()),
1707 ..Default::default()
1708 },
1709 )
1710 .unwrap();
1711 let base = chrono::Utc::now();
1712 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1713 .iter()
1714 .enumerate()
1715 {
1716 let mut m = sample_meta("k", body.len() as u64);
1717 m.version_id = Some((*vid).to_string());
1718 m.last_modified = base + chrono::Duration::seconds(i as i64);
1719 store
1720 .put_object(
1721 "b",
1722 "k",
1723 Some(*vid),
1724 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1725 &m,
1726 )
1727 .unwrap();
1728 }
1729 let mut dm = sample_meta("k", 0);
1731 dm.version_id = Some("dm1".to_string());
1732 dm.is_delete_marker = true;
1733 dm.last_modified = base + chrono::Duration::seconds(10);
1734 store
1735 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1736 .unwrap();
1737 let loaded = store.load().unwrap();
1738 let snap = loaded.buckets.get("b").unwrap();
1739 assert!(
1740 !snap.objects.contains_key("k"),
1741 "trailing delete marker must hide current object",
1742 );
1743 assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1744 }
1745
1746 #[test]
1747 fn legacy_null_object_overridden_by_newer_versions() {
1748 let tmp = TempDir::new().unwrap();
1751 let store = new_store(&tmp);
1752 store
1753 .put_bucket_meta(
1754 "b",
1755 &BucketMeta {
1756 name: "b".to_string(),
1757 ..Default::default()
1758 },
1759 )
1760 .unwrap();
1761 let base = chrono::Utc::now();
1762 let mut null_meta = sample_meta("k", 3);
1763 null_meta.last_modified = base;
1764 store
1765 .put_object(
1766 "b",
1767 "k",
1768 None,
1769 BodySource::Bytes(Bytes::from_static(b"old")),
1770 &null_meta,
1771 )
1772 .unwrap();
1773 store
1775 .put_bucket_meta(
1776 "b",
1777 &BucketMeta {
1778 name: "b".to_string(),
1779 versioning: Some("Enabled".to_string()),
1780 ..Default::default()
1781 },
1782 )
1783 .unwrap();
1784 let mut v1 = sample_meta("k", 3);
1785 v1.version_id = Some("v1".to_string());
1786 v1.last_modified = base + chrono::Duration::seconds(1);
1787 store
1788 .put_object(
1789 "b",
1790 "k",
1791 Some("v1"),
1792 BodySource::Bytes(Bytes::from_static(b"new")),
1793 &v1,
1794 )
1795 .unwrap();
1796 let mut v2 = sample_meta("k", 5);
1797 v2.version_id = Some("v2".to_string());
1798 v2.last_modified = base + chrono::Duration::seconds(2);
1799 store
1800 .put_object(
1801 "b",
1802 "k",
1803 Some("v2"),
1804 BodySource::Bytes(Bytes::from_static(b"newer")),
1805 &v2,
1806 )
1807 .unwrap();
1808 let loaded = store.load().unwrap();
1809 let snap = loaded.buckets.get("b").unwrap();
1810 let current = snap
1811 .objects
1812 .get("k")
1813 .expect("latest live version must override stale null");
1814 assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1815 assert_eq!(current.meta.size, 5);
1816 }
1817
1818 #[test]
1819 fn legacy_null_hidden_by_trailing_delete_marker() {
1820 let tmp = TempDir::new().unwrap();
1821 let store = new_store(&tmp);
1822 store
1823 .put_bucket_meta(
1824 "b",
1825 &BucketMeta {
1826 name: "b".to_string(),
1827 ..Default::default()
1828 },
1829 )
1830 .unwrap();
1831 let base = chrono::Utc::now();
1832 let mut null_meta = sample_meta("k", 3);
1833 null_meta.last_modified = base;
1834 store
1835 .put_object(
1836 "b",
1837 "k",
1838 None,
1839 BodySource::Bytes(Bytes::from_static(b"old")),
1840 &null_meta,
1841 )
1842 .unwrap();
1843 store
1844 .put_bucket_meta(
1845 "b",
1846 &BucketMeta {
1847 name: "b".to_string(),
1848 versioning: Some("Enabled".to_string()),
1849 ..Default::default()
1850 },
1851 )
1852 .unwrap();
1853 let mut v1 = sample_meta("k", 3);
1854 v1.version_id = Some("v1".to_string());
1855 v1.last_modified = base + chrono::Duration::seconds(1);
1856 store
1857 .put_object(
1858 "b",
1859 "k",
1860 Some("v1"),
1861 BodySource::Bytes(Bytes::from_static(b"new")),
1862 &v1,
1863 )
1864 .unwrap();
1865 let mut dm = sample_meta("k", 0);
1866 dm.version_id = Some("dm1".to_string());
1867 dm.is_delete_marker = true;
1868 dm.last_modified = base + chrono::Duration::seconds(2);
1869 store
1870 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1871 .unwrap();
1872 let loaded = store.load().unwrap();
1873 let snap = loaded.buckets.get("b").unwrap();
1874 assert!(
1875 !snap.objects.contains_key("k"),
1876 "trailing delete marker must hide even a legacy null object",
1877 );
1878 }
1879
1880 #[test]
1881 fn delete_marker_roundtrip_no_body_file() {
1882 let tmp = TempDir::new().unwrap();
1883 let store = new_store(&tmp);
1884 store
1885 .put_bucket_meta(
1886 "b",
1887 &BucketMeta {
1888 name: "b".to_string(),
1889 versioning: Some("Enabled".to_string()),
1890 ..Default::default()
1891 },
1892 )
1893 .unwrap();
1894 let mut m = sample_meta("k", 0);
1895 m.version_id = Some("dm1".to_string());
1896 m.is_delete_marker = true;
1897 store
1898 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1899 .unwrap();
1900 let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1902 assert!(!bin.exists(), "delete marker must not have a .bin file");
1903 assert!(toml_path.exists());
1904
1905 let loaded = store.load().unwrap();
1906 let versions = loaded
1907 .buckets
1908 .get("b")
1909 .unwrap()
1910 .object_versions
1911 .get("k")
1912 .unwrap();
1913 assert_eq!(versions.len(), 1);
1914 assert!(versions[0].meta.is_delete_marker);
1915 match &versions[0].body {
1916 BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1917 _ => panic!("delete marker body should be empty Memory"),
1918 }
1919 }
1920
1921 #[test]
1922 fn mixed_nonversioned_and_versioned_buckets() {
1923 let tmp = TempDir::new().unwrap();
1924 let store = new_store(&tmp);
1925 store
1926 .put_bucket_meta(
1927 "a",
1928 &BucketMeta {
1929 name: "a".to_string(),
1930 ..Default::default()
1931 },
1932 )
1933 .unwrap();
1934 store
1935 .put_bucket_meta(
1936 "b",
1937 &BucketMeta {
1938 name: "b".to_string(),
1939 versioning: Some("Enabled".to_string()),
1940 ..Default::default()
1941 },
1942 )
1943 .unwrap();
1944 let ma = sample_meta("only", 3);
1945 store
1946 .put_object(
1947 "a",
1948 "only",
1949 None,
1950 BodySource::Bytes(Bytes::from_static(b"aaa")),
1951 &ma,
1952 )
1953 .unwrap();
1954 let base = chrono::Utc::now();
1955 for (i, vid) in ["v1", "v2"].iter().enumerate() {
1956 let mut m = sample_meta("twice", 2);
1957 m.version_id = Some((*vid).to_string());
1958 m.last_modified = base + chrono::Duration::seconds(i as i64);
1959 store
1960 .put_object(
1961 "b",
1962 "twice",
1963 Some(*vid),
1964 BodySource::Bytes(Bytes::from_static(b"xx")),
1965 &m,
1966 )
1967 .unwrap();
1968 }
1969 let loaded = store.load().unwrap();
1970 assert_eq!(loaded.buckets.len(), 2);
1971 let a = loaded.buckets.get("a").unwrap();
1972 assert_eq!(a.objects.len(), 1);
1973 assert!(a.object_versions.is_empty());
1974 let b = loaded.buckets.get("b").unwrap();
1975 assert_eq!(
1978 b.objects.get("twice").unwrap().meta.version_id.as_deref(),
1979 Some("v2")
1980 );
1981 assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
1982 }
1983
1984 fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
1985 MpuInit {
1986 upload_id: upload_id.to_string(),
1987 key: key.to_string(),
1988 initiated: chrono::Utc::now(),
1989 content_type: "application/octet-stream".to_string(),
1990 storage_class: "STANDARD".to_string(),
1991 ..Default::default()
1992 }
1993 }
1994
1995 #[test]
1996 fn mpu_create_then_load_empty_parts() {
1997 let tmp = TempDir::new().unwrap();
1998 let store = new_store(&tmp);
1999 store
2000 .put_bucket_meta(
2001 "b",
2002 &BucketMeta {
2003 name: "b".to_string(),
2004 ..Default::default()
2005 },
2006 )
2007 .unwrap();
2008 let init = sample_mpu_init("up1", "k1");
2009 store.mpu_create("b", "up1", &init).unwrap();
2010 let loaded = store.load().unwrap();
2011 let snap = loaded.buckets.get("b").unwrap();
2012 let m = snap.multipart_uploads.get("up1").expect("upload present");
2013 assert_eq!(m.init.upload_id, "up1");
2014 assert_eq!(m.init.key, "k1");
2015 assert!(m.parts.is_empty());
2016 }
2017
2018 #[test]
2019 fn mpu_put_part_then_load_three_parts() {
2020 let tmp = TempDir::new().unwrap();
2021 let store = new_store(&tmp);
2022 store
2023 .put_bucket_meta(
2024 "b",
2025 &BucketMeta {
2026 name: "b".to_string(),
2027 ..Default::default()
2028 },
2029 )
2030 .unwrap();
2031 store
2032 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2033 .unwrap();
2034 let bodies = [
2035 Bytes::from_static(b"part-one"),
2036 Bytes::from_static(b"part-two-longer"),
2037 Bytes::from_static(b"p3"),
2038 ];
2039 for (i, body) in bodies.iter().enumerate() {
2040 let n = (i + 1) as u32;
2041 store
2042 .mpu_put_part(
2043 "b",
2044 "up",
2045 n,
2046 BodySource::Bytes(body.clone()),
2047 &format!("et{}", n),
2048 )
2049 .unwrap();
2050 }
2051 let loaded = store.load().unwrap();
2052 let snap = loaded.buckets.get("b").unwrap();
2053 let m = snap.multipart_uploads.get("up").unwrap();
2054 assert_eq!(m.parts.len(), 3);
2055 for (i, body) in bodies.iter().enumerate() {
2056 let n = (i + 1) as u32;
2057 let part = m.parts.get(&n).unwrap();
2058 assert_eq!(part.meta.part_number, n);
2059 assert_eq!(part.meta.size, body.len() as u64);
2060 assert_eq!(part.meta.etag, format!("et{}", n));
2061 match &part.body {
2062 BodyRef::Disk { path, size, .. } => {
2063 assert_eq!(*size, body.len() as u64);
2064 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2065 }
2066 _ => panic!("expected Disk"),
2067 }
2068 }
2069 }
2070
2071 #[test]
2072 fn mpu_abort_removes_upload() {
2073 let tmp = TempDir::new().unwrap();
2074 let store = new_store(&tmp);
2075 store
2076 .put_bucket_meta(
2077 "b",
2078 &BucketMeta {
2079 name: "b".to_string(),
2080 ..Default::default()
2081 },
2082 )
2083 .unwrap();
2084 store
2085 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2086 .unwrap();
2087 store
2088 .mpu_put_part(
2089 "b",
2090 "up",
2091 1,
2092 BodySource::Bytes(Bytes::from_static(b"x")),
2093 "e",
2094 )
2095 .unwrap();
2096 store.mpu_abort("b", "up").unwrap();
2097 assert!(!store.mpu_dir("b", "up").exists());
2098 store.mpu_abort("b", "up").unwrap();
2100 let loaded = store.load().unwrap();
2101 let snap = loaded.buckets.get("b").unwrap();
2102 assert!(snap.multipart_uploads.is_empty());
2103 }
2104
2105 #[test]
2106 fn mpu_complete_single_part_fast_path() {
2107 let tmp = TempDir::new().unwrap();
2108 let store = new_store(&tmp);
2109 store
2110 .put_bucket_meta(
2111 "b",
2112 &BucketMeta {
2113 name: "b".to_string(),
2114 ..Default::default()
2115 },
2116 )
2117 .unwrap();
2118 store
2119 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2120 .unwrap();
2121 let body = Bytes::from_static(b"only-part-bytes");
2122 store
2123 .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2124 .unwrap();
2125 let meta = sample_meta("k", body.len() as u64);
2126 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2127 match &body_ref {
2128 BodyRef::Disk { path, size, .. } => {
2129 assert_eq!(*size, body.len() as u64);
2130 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2131 }
2132 _ => panic!("expected Disk"),
2133 }
2134 assert!(!store.mpu_dir("b", "up").exists());
2135 }
2136
2137 #[test]
2138 fn mpu_complete_multi_part_concat() {
2139 let tmp = TempDir::new().unwrap();
2140 let store = new_store(&tmp);
2141 store
2142 .put_bucket_meta(
2143 "b",
2144 &BucketMeta {
2145 name: "b".to_string(),
2146 ..Default::default()
2147 },
2148 )
2149 .unwrap();
2150 store
2151 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2152 .unwrap();
2153 let p1 = Bytes::from_static(b"AAAA");
2154 let p2 = Bytes::from_static(b"BBBBBB");
2155 let p3 = Bytes::from_static(b"CC");
2156 for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2157 store
2158 .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2159 .unwrap();
2160 }
2161 let mut expected = Vec::new();
2162 expected.extend_from_slice(&p1);
2163 expected.extend_from_slice(&p2);
2164 expected.extend_from_slice(&p3);
2165 let meta = sample_meta("k", expected.len() as u64);
2166 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2167 let path = match body_ref {
2168 BodyRef::Disk { path, size, .. } => {
2169 assert_eq!(size, expected.len() as u64);
2170 path
2171 }
2172 _ => panic!("expected Disk"),
2173 };
2174 assert_eq!(std::fs::read(&path).unwrap(), expected);
2175 assert!(!store.mpu_dir("b", "up").exists());
2176 }
2177
2178 #[test]
2179 fn mpu_complete_large_streaming_via_file_source() {
2180 let tmp = TempDir::new().unwrap();
2181 let store = new_store(&tmp);
2182 store
2183 .put_bucket_meta(
2184 "b",
2185 &BucketMeta {
2186 name: "b".to_string(),
2187 ..Default::default()
2188 },
2189 )
2190 .unwrap();
2191 store
2192 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2193 .unwrap();
2194
2195 const PART_SIZE: usize = 1024 * 1024;
2198 let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2199 let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2200 for (i, byte) in patterns.iter().enumerate() {
2201 let src = tmp.path().join(format!("src-{}.bin", i + 1));
2202 let data = vec![*byte; PART_SIZE];
2203 std::fs::write(&src, &data).unwrap();
2204 expected.extend_from_slice(&data);
2205 store
2206 .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2207 .unwrap();
2208 }
2209 let meta = sample_meta("k", expected.len() as u64);
2210 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2211 let path = match body_ref {
2212 BodyRef::Disk { path, size, .. } => {
2213 assert_eq!(size, expected.len() as u64);
2214 path
2215 }
2216 _ => panic!("expected Disk"),
2217 };
2218 let actual = std::fs::read(&path).unwrap();
2219 assert_eq!(actual.len(), expected.len());
2220 assert_eq!(actual, expected);
2221 assert!(!store.mpu_dir("b", "up").exists());
2222 }
2223
2224 #[test]
2225 fn mpu_resumable_across_load() {
2226 let tmp = TempDir::new().unwrap();
2227 let store = new_store(&tmp);
2228 store
2229 .put_bucket_meta(
2230 "b",
2231 &BucketMeta {
2232 name: "b".to_string(),
2233 ..Default::default()
2234 },
2235 )
2236 .unwrap();
2237 store
2238 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2239 .unwrap();
2240 let p1 = Bytes::from_static(b"hello-");
2241 let p2 = Bytes::from_static(b"world-");
2242 store
2243 .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2244 .unwrap();
2245 store
2246 .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2247 .unwrap();
2248
2249 let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2251 let loaded = store2.load().unwrap();
2252 let snap = loaded.buckets.get("b").unwrap();
2253 let m = snap.multipart_uploads.get("up").unwrap();
2254 assert_eq!(m.parts.len(), 2);
2255 assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2256 assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2257
2258 let p3 = Bytes::from_static(b"again!");
2260 store2
2261 .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2262 .unwrap();
2263 let mut expected = Vec::new();
2264 expected.extend_from_slice(&p1);
2265 expected.extend_from_slice(&p2);
2266 expected.extend_from_slice(&p3);
2267 let meta = sample_meta("k", expected.len() as u64);
2268 let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2269 let path = match body_ref {
2270 BodyRef::Disk { path, .. } => path,
2271 _ => panic!(),
2272 };
2273 assert_eq!(std::fs::read(&path).unwrap(), expected);
2274 assert!(!store2.mpu_dir("b", "up").exists());
2275 }
2276
2277 #[test]
2278 fn tags_snapshot_roundtrip_via_store() {
2279 let tmp = TempDir::new().unwrap();
2280 let store = new_store(&tmp);
2281 store
2282 .put_bucket_meta(
2283 "b",
2284 &BucketMeta {
2285 name: "b".to_string(),
2286 ..Default::default()
2287 },
2288 )
2289 .unwrap();
2290 let mut tags = BTreeMap::new();
2291 tags.insert("env".to_string(), "prod".to_string());
2292 tags.insert("team".to_string(), "s3".to_string());
2293 let snap = TagsSnapshot { tags: tags.clone() };
2294 let payload = toml::to_string(&snap).unwrap();
2295 store
2296 .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2297 .unwrap();
2298 let loaded = store.load().unwrap();
2299 let text = loaded
2300 .buckets
2301 .get("b")
2302 .unwrap()
2303 .subresources
2304 .get("tags.toml")
2305 .cloned()
2306 .unwrap();
2307 let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2308 assert_eq!(decoded.tags, tags);
2309 }
2310
2311 #[test]
2312 fn acl_snapshot_roundtrip_via_store() {
2313 let tmp = TempDir::new().unwrap();
2314 let store = new_store(&tmp);
2315 store
2316 .put_bucket_meta(
2317 "b",
2318 &BucketMeta {
2319 name: "b".to_string(),
2320 ..Default::default()
2321 },
2322 )
2323 .unwrap();
2324 let snap = AclSnapshot {
2325 owner_id: "owner-abc".to_string(),
2326 grants: vec![
2327 AclGrantSnapshot {
2328 grantee_type: "CanonicalUser".to_string(),
2329 grantee_id: Some("owner-abc".to_string()),
2330 grantee_display_name: Some("owner".to_string()),
2331 grantee_uri: None,
2332 permission: "FULL_CONTROL".to_string(),
2333 },
2334 AclGrantSnapshot {
2335 grantee_type: "Group".to_string(),
2336 grantee_id: None,
2337 grantee_display_name: None,
2338 grantee_uri: Some(
2339 "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2340 ),
2341 permission: "READ".to_string(),
2342 },
2343 ],
2344 };
2345 let payload = toml::to_string(&snap).unwrap();
2346 store
2347 .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2348 .unwrap();
2349 let loaded = store.load().unwrap();
2350 let text = loaded
2351 .buckets
2352 .get("b")
2353 .unwrap()
2354 .subresources
2355 .get("acl.toml")
2356 .cloned()
2357 .unwrap();
2358 let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2359 assert_eq!(decoded.owner_id, "owner-abc");
2360 assert_eq!(decoded.grants.len(), 2);
2361 assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2362 assert_eq!(decoded.grants[1].grantee_type, "Group");
2363 }
2364
2365 #[test]
2366 fn inventory_snapshot_roundtrip_via_store() {
2367 let tmp = TempDir::new().unwrap();
2368 let store = new_store(&tmp);
2369 store
2370 .put_bucket_meta(
2371 "b",
2372 &BucketMeta {
2373 name: "b".to_string(),
2374 ..Default::default()
2375 },
2376 )
2377 .unwrap();
2378 let mut configs = BTreeMap::new();
2379 configs.insert(
2380 "inv-1".to_string(),
2381 "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2382 );
2383 configs.insert(
2384 "inv-2".to_string(),
2385 "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2386 );
2387 let snap = InventorySnapshot {
2388 configs: configs.clone(),
2389 };
2390 let payload = toml::to_string(&snap).unwrap();
2391 store
2392 .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2393 .unwrap();
2394 let loaded = store.load().unwrap();
2395 let text = loaded
2396 .buckets
2397 .get("b")
2398 .unwrap()
2399 .subresources
2400 .get("inventory.toml")
2401 .cloned()
2402 .unwrap();
2403 let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2404 assert_eq!(decoded.configs, configs);
2405 }
2406
2407 #[test]
2408 fn legacy_versioning_file_is_read() {
2409 let tmp = TempDir::new().unwrap();
2410 let store = new_store(&tmp);
2411 store
2413 .put_bucket_meta(
2414 "b",
2415 &BucketMeta {
2416 name: "b".to_string(),
2417 ..Default::default()
2418 },
2419 )
2420 .unwrap();
2421 let path = store.bucket_dir("b").join("versioning.toml");
2423 std::fs::write(&path, "Enabled").unwrap();
2424 let loaded = store.load().unwrap();
2425 let snap = loaded.buckets.get("b").unwrap();
2426 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2427 }
2428}