1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11 #[serde(default)]
12 pub account_id: String,
13 #[serde(default)]
14 pub region: String,
15 #[serde(default)]
16 pub buckets: BTreeMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21 pub meta: BucketMeta,
22 #[serde(default)]
23 pub objects: BTreeMap<String, LoadedObject>,
24 #[serde(default)]
25 pub object_versions: BTreeMap<String, Vec<LoadedObject>>,
26 #[serde(default)]
27 pub subresources: BTreeMap<String, String>,
28 #[serde(default)]
29 pub multipart_uploads: BTreeMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34 pub init: MpuInit,
35 #[serde(default)]
36 pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41 pub meta: UploadPartMeta,
42 #[serde(default)]
43 pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48 pub meta: ObjectMeta,
49 #[serde(default)]
50 pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55 #[serde(default)]
56 pub name: String,
57 #[serde(default = "default_time")]
58 pub creation_date: DateTime<Utc>,
59 #[serde(default)]
60 pub region: String,
61 #[serde(default)]
62 pub versioning: Option<String>,
63 #[serde(default)]
64 pub acl: Option<String>,
65 #[serde(default)]
66 pub acl_owner_id: String,
67 #[serde(default)]
68 pub accelerate_status: Option<String>,
69 #[serde(default)]
70 pub eventbridge_enabled: bool,
71 #[serde(default)]
72 pub lifecycle_transition_default_min_size: Option<String>,
73}
74
75fn default_time() -> DateTime<Utc> {
76 DateTime::<Utc>::MIN_UTC
77}
78
79#[derive(Debug, Default, Clone, Serialize, Deserialize)]
80pub struct AclGrantSnapshot {
81 pub grantee_type: String,
82 #[serde(default)]
83 pub grantee_id: Option<String>,
84 #[serde(default)]
85 pub grantee_display_name: Option<String>,
86 #[serde(default)]
87 pub grantee_uri: Option<String>,
88 pub permission: String,
89}
90
91#[derive(Debug, Default, Clone, Serialize, Deserialize)]
92pub struct TagsSnapshot {
93 #[serde(default)]
94 pub tags: BTreeMap<String, String>,
95}
96
97#[derive(Debug, Default, Clone, Serialize, Deserialize)]
98pub struct AclSnapshot {
99 #[serde(default)]
100 pub owner_id: String,
101 #[serde(default)]
102 pub grants: Vec<AclGrantSnapshot>,
103}
104
105#[derive(Debug, Default, Clone, Serialize, Deserialize)]
106pub struct InventorySnapshot {
107 #[serde(default)]
108 pub configs: BTreeMap<String, String>,
109}
110
111#[derive(Debug, Default, Clone, Serialize, Deserialize)]
112pub struct ObjectMeta {
113 #[serde(default)]
114 pub key: String,
115 #[serde(default)]
116 pub content_type: String,
117 #[serde(default)]
118 pub etag: String,
119 #[serde(default)]
120 pub size: u64,
121 #[serde(default = "default_time")]
122 pub last_modified: DateTime<Utc>,
123 #[serde(default)]
124 pub metadata: BTreeMap<String, String>,
125 #[serde(default)]
126 pub tags: BTreeMap<String, String>,
127 #[serde(default)]
128 pub storage_class: String,
129 #[serde(default)]
130 pub acl_grants: Vec<AclGrantSnapshot>,
131 #[serde(default)]
132 pub acl_owner_id: Option<String>,
133 #[serde(default)]
134 pub parts_count: Option<u32>,
135 #[serde(default)]
136 pub part_sizes: Option<Vec<(u32, u64)>>,
137 #[serde(default)]
138 pub sse_algorithm: Option<String>,
139 #[serde(default)]
140 pub sse_kms_key_id: Option<String>,
141 #[serde(default)]
142 pub bucket_key_enabled: Option<bool>,
143 #[serde(default)]
144 pub version_id: Option<String>,
145 #[serde(default)]
146 pub is_delete_marker: bool,
147 #[serde(default)]
148 pub restore_ongoing: Option<bool>,
149 #[serde(default)]
150 pub restore_expiry: Option<String>,
151 #[serde(default)]
152 pub checksum_algorithm: Option<String>,
153 #[serde(default)]
154 pub checksum_value: Option<String>,
155 #[serde(default)]
156 pub lock_mode: Option<String>,
157 #[serde(default)]
158 pub lock_retain_until: Option<DateTime<Utc>>,
159 #[serde(default)]
160 pub lock_legal_hold: Option<String>,
161 #[serde(default)]
162 pub content_encoding: Option<String>,
163 #[serde(default)]
164 pub website_redirect_location: Option<String>,
165}
166
167#[derive(Debug, Default, Clone, Serialize, Deserialize)]
168pub struct MpuInit {
169 pub upload_id: String,
170 pub key: String,
171 #[serde(default = "default_time")]
172 pub initiated: DateTime<Utc>,
173 #[serde(default)]
174 pub metadata: BTreeMap<String, String>,
175 #[serde(default)]
176 pub content_type: String,
177 #[serde(default)]
178 pub storage_class: String,
179 #[serde(default)]
180 pub sse_algorithm: Option<String>,
181 #[serde(default)]
182 pub sse_kms_key_id: Option<String>,
183 #[serde(default)]
184 pub tagging: Option<String>,
185 #[serde(default)]
186 pub acl_grants: Vec<AclGrantSnapshot>,
187 #[serde(default)]
188 pub checksum_algorithm: Option<String>,
189}
190
191#[derive(Debug, Default, Clone, Serialize, Deserialize)]
192pub struct UploadPartMeta {
193 pub part_number: u32,
194 pub etag: String,
195 pub size: u64,
196 #[serde(default = "default_time")]
197 pub last_modified: DateTime<Utc>,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
201pub enum BucketSubresource {
202 Tags,
203 Lifecycle,
204 Cors,
205 Policy,
206 Notification,
207 Logging,
208 Website,
209 PublicAccessBlock,
210 ObjectLock,
211 Replication,
212 Ownership,
213 Inventory,
214 Encryption,
215 Versioning,
216 Acl,
217 Accelerate,
218 Analytics,
219 IntelligentTiering,
220 Metrics,
221 RequestPayment,
222 Abac,
223 MetadataConfiguration,
224 MetadataTableConfiguration,
225}
226
227pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
228 BucketSubresource::Tags,
229 BucketSubresource::Lifecycle,
230 BucketSubresource::Cors,
231 BucketSubresource::Policy,
232 BucketSubresource::Notification,
233 BucketSubresource::Logging,
234 BucketSubresource::Website,
235 BucketSubresource::PublicAccessBlock,
236 BucketSubresource::ObjectLock,
237 BucketSubresource::Replication,
238 BucketSubresource::Ownership,
239 BucketSubresource::Inventory,
240 BucketSubresource::Encryption,
241 BucketSubresource::Versioning,
242 BucketSubresource::Acl,
243 BucketSubresource::Accelerate,
244 BucketSubresource::Analytics,
245 BucketSubresource::IntelligentTiering,
246 BucketSubresource::Metrics,
247 BucketSubresource::RequestPayment,
248 BucketSubresource::Abac,
249 BucketSubresource::MetadataConfiguration,
250 BucketSubresource::MetadataTableConfiguration,
251];
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub enum BodyRef {
255 #[serde(skip)]
256 Memory(Bytes),
257 Disk {
258 bucket: String,
259 key: String,
260 #[serde(default)]
261 version: Option<String>,
262 path: PathBuf,
263 size: u64,
264 },
265}
266
267impl BodyRef {
268 pub fn size(&self) -> u64 {
269 match self {
270 BodyRef::Memory(b) => b.len() as u64,
271 BodyRef::Disk { size, .. } => *size,
272 }
273 }
274}
275
276impl Default for BodyRef {
277 fn default() -> Self {
278 BodyRef::Memory(Bytes::new())
279 }
280}
281
282#[derive(Debug)]
283pub enum BodySource {
284 Bytes(Bytes),
285 File(PathBuf),
288 FileCopy(PathBuf),
292}
293
294fn read_body_source(body: BodySource) -> StoreResult<Bytes> {
300 match body {
301 BodySource::Bytes(b) => Ok(b),
302 BodySource::File(p) => {
303 let bytes = std::fs::read(&p)?;
304 let _ = std::fs::remove_file(&p);
307 Ok(Bytes::from(bytes))
308 }
309 BodySource::FileCopy(p) => {
310 let bytes = std::fs::read(&p)?;
311 Ok(Bytes::from(bytes))
312 }
313 }
314}
315
316#[derive(Debug, Error)]
317pub enum StoreError {
318 #[error("io error: {0}")]
319 Io(#[from] std::io::Error),
320 #[error("serialization error: {0}")]
321 Serde(String),
322 #[error("not supported by this store")]
323 NotSupported,
324 #[error("{0}")]
325 Other(String),
326}
327
328pub type StoreResult<T> = Result<T, StoreError>;
329
330pub trait S3Store: Send + Sync {
331 fn load(&self) -> StoreResult<S3State>;
332
333 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
334 fn put_bucket_subresource(
335 &self,
336 bucket: &str,
337 kind: BucketSubresource,
338 payload: &str,
339 ) -> StoreResult<()>;
340 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
341 fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
342
343 fn put_object(
344 &self,
345 bucket: &str,
346 key: &str,
347 version: Option<&str>,
348 body: BodySource,
349 meta: &ObjectMeta,
350 ) -> StoreResult<BodyRef>;
351 fn put_object_meta(
352 &self,
353 bucket: &str,
354 key: &str,
355 version: Option<&str>,
356 meta: &ObjectMeta,
357 ) -> StoreResult<()>;
358 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
359 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
360
361 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
362 fn mpu_put_part(
368 &self,
369 bucket: &str,
370 upload_id: &str,
371 part_number: u32,
372 body: BodySource,
373 etag: &str,
374 ) -> StoreResult<BodyRef>;
375 fn spool_dir(&self) -> Option<std::path::PathBuf> {
382 None
383 }
384 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
385 fn mpu_complete(
386 &self,
387 bucket: &str,
388 upload_id: &str,
389 final_key: &str,
390 version: Option<&str>,
391 meta: &ObjectMeta,
392 ) -> StoreResult<BodyRef>;
393}
394
395pub struct MemoryS3Store;
396
397impl MemoryS3Store {
398 pub fn new() -> Self {
399 Self
400 }
401}
402
403impl Default for MemoryS3Store {
404 fn default() -> Self {
405 Self::new()
406 }
407}
408
409impl S3Store for MemoryS3Store {
410 fn load(&self) -> StoreResult<S3State> {
411 Ok(S3State::default())
412 }
413
414 fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
415 Ok(())
416 }
417 fn put_bucket_subresource(
418 &self,
419 _bucket: &str,
420 _kind: BucketSubresource,
421 _payload: &str,
422 ) -> StoreResult<()> {
423 Ok(())
424 }
425 fn delete_bucket_subresource(
426 &self,
427 _bucket: &str,
428 _kind: BucketSubresource,
429 ) -> StoreResult<()> {
430 Ok(())
431 }
432 fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
433 Ok(())
434 }
435
436 fn put_object(
437 &self,
438 _bucket: &str,
439 _key: &str,
440 _version: Option<&str>,
441 body: BodySource,
442 _meta: &ObjectMeta,
443 ) -> StoreResult<BodyRef> {
444 Ok(BodyRef::Memory(read_body_source(body)?))
445 }
446 fn put_object_meta(
447 &self,
448 _bucket: &str,
449 _key: &str,
450 _version: Option<&str>,
451 _meta: &ObjectMeta,
452 ) -> StoreResult<()> {
453 Ok(())
454 }
455 fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
456 Ok(())
457 }
458 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
459 match body {
460 BodyRef::Memory(b) => Ok(b.clone()),
461 BodyRef::Disk { .. } => {
462 panic!("MemoryS3Store cannot open Disk-backed BodyRef")
463 }
464 }
465 }
466
467 fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
468 Ok(())
469 }
470 fn mpu_put_part(
471 &self,
472 _bucket: &str,
473 _upload_id: &str,
474 _part_number: u32,
475 body: BodySource,
476 _etag: &str,
477 ) -> StoreResult<BodyRef> {
478 Ok(BodyRef::Memory(read_body_source(body)?))
481 }
482 fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
483 Ok(())
484 }
485 fn mpu_complete(
486 &self,
487 _bucket: &str,
488 _upload_id: &str,
489 _final_key: &str,
490 _version: Option<&str>,
491 _meta: &ObjectMeta,
492 ) -> StoreResult<BodyRef> {
493 Ok(BodyRef::Memory(Bytes::new()))
494 }
495}
496
497pub struct DiskS3Store {
498 root: PathBuf,
499 cache: std::sync::Arc<crate::cache::BodyCache>,
500}
501
502impl DiskS3Store {
503 pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
504 Self { root, cache }
505 }
506
507 fn buckets_dir(&self) -> PathBuf {
508 self.root.join("buckets")
509 }
510
511 fn bucket_dir(&self, bucket: &str) -> PathBuf {
512 self.buckets_dir()
513 .join(crate::key_escape::escape_key_segment(bucket))
514 }
515
516 fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
517 self.bucket_dir(bucket)
518 .join("objects")
519 .join(crate::key_escape::escape_key_segment(key))
520 }
521
522 fn version_tag(version: Option<&str>) -> String {
523 version.unwrap_or("null").to_string()
524 }
525
526 fn object_paths(
527 &self,
528 bucket: &str,
529 key: &str,
530 version: Option<&str>,
531 ) -> (PathBuf, PathBuf, PathBuf) {
532 let dir = self.object_dir(bucket, key);
533 let tag = Self::version_tag(version);
534 let bin = dir.join(format!("{}.bin", tag));
535 let toml = dir.join(format!("{}.toml", tag));
536 (dir, bin, toml)
537 }
538
539 fn subresource_filename(kind: BucketSubresource) -> &'static str {
540 match kind {
541 BucketSubresource::Tags => "tags.toml",
542 BucketSubresource::Lifecycle => "lifecycle.toml",
543 BucketSubresource::Cors => "cors.toml",
544 BucketSubresource::Policy => "policy.toml",
545 BucketSubresource::Notification => "notification.toml",
546 BucketSubresource::Logging => "logging.toml",
547 BucketSubresource::Website => "website.toml",
548 BucketSubresource::PublicAccessBlock => "public_access_block.toml",
549 BucketSubresource::ObjectLock => "object_lock.toml",
550 BucketSubresource::Replication => "replication.toml",
551 BucketSubresource::Ownership => "ownership.toml",
552 BucketSubresource::Inventory => "inventory.toml",
553 BucketSubresource::Encryption => "encryption.toml",
554 BucketSubresource::Versioning => "versioning.toml",
555 BucketSubresource::Acl => "acl.toml",
556 BucketSubresource::Accelerate => "accelerate.toml",
557 BucketSubresource::Analytics => "analytics.toml",
558 BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
559 BucketSubresource::Metrics => "metrics.toml",
560 BucketSubresource::RequestPayment => "request_payment.toml",
561 BucketSubresource::Abac => "abac.toml",
562 BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
563 BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
564 }
565 }
566
567 fn cleanup_empty(dir: &std::path::Path) {
568 let _ = std::fs::remove_dir(dir);
569 }
570
571 fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
572 self.bucket_dir(bucket)
573 .join("mpu")
574 .join(crate::key_escape::escape_key_segment(upload_id))
575 }
576
577 fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
578 self.mpu_dir(bucket, upload_id).join("parts")
579 }
580
581 fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
582 self.mpu_parts_dir(bucket, upload_id)
583 .join(format!("{}.bin", part_number))
584 }
585
586 fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
587 self.mpu_parts_dir(bucket, upload_id)
588 .join(format!("{}.toml", part_number))
589 }
590
591 fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
592 crate::cache::BodyKey::new(
593 bucket.to_string(),
594 format!("__mpu__/{}", upload_id),
595 Some(format!("part-{}", part_number)),
596 )
597 }
598}
599
600fn io_other(msg: impl Into<String>) -> StoreError {
601 StoreError::Other(msg.into())
602}
603
604impl S3Store for DiskS3Store {
605 fn load(&self) -> StoreResult<S3State> {
606 let mut state = S3State::default();
607 let buckets_dir = self.buckets_dir();
608 if !buckets_dir.exists() {
609 return Ok(state);
610 }
611 for entry in std::fs::read_dir(&buckets_dir)? {
612 let entry = entry?;
613 if !entry.file_type()?.is_dir() {
614 continue;
615 }
616 let bdir = entry.path();
617 let loaded: StoreResult<Option<BucketSnapshot>> = (|| {
622 let meta_path = bdir.join("meta.toml");
623 if !meta_path.exists() {
624 return Ok(None);
625 }
626 let meta_text = std::fs::read_to_string(&meta_path)?;
627 let mut meta: BucketMeta =
628 toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
629 let mut snap = BucketSnapshot {
630 meta: meta.clone(),
631 objects: BTreeMap::new(),
632 object_versions: BTreeMap::new(),
633 subresources: BTreeMap::new(),
634 multipart_uploads: BTreeMap::new(),
635 };
636
637 for kind in ALL_SUBRESOURCES {
638 let fname = Self::subresource_filename(*kind);
639 let path = bdir.join(fname);
640 if path.exists() {
641 let text = std::fs::read_to_string(&path)?;
642 if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none()
643 {
644 let stripped = text.trim();
645 if !stripped.is_empty() {
646 snap.meta.versioning = Some(stripped.to_string());
647 meta.versioning = snap.meta.versioning.clone();
648 }
649 }
650 snap.subresources.insert(fname.to_string(), text);
651 }
652 }
653
654 let objects_root = bdir.join("objects");
655 if objects_root.exists() {
656 for okey_entry in std::fs::read_dir(&objects_root)? {
657 let okey_entry = okey_entry?;
658 if !okey_entry.file_type()?.is_dir() {
659 continue;
660 }
661 let key_dir = okey_entry.path();
662 let mut versioned: Vec<LoadedObject> = Vec::new();
663 let mut key_name: Option<String> = None;
664 for version_entry in std::fs::read_dir(&key_dir)? {
665 let version_entry = version_entry?;
666 let path = version_entry.path();
667 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
668 continue;
669 };
670 if !fname.ends_with(".toml") {
671 continue;
672 }
673 let version_tag = &fname[..fname.len() - 5];
674 let toml_text = std::fs::read_to_string(&path)?;
675 let obj_meta: ObjectMeta = toml::from_str(&toml_text)
676 .map_err(|e| StoreError::Serde(e.to_string()))?;
677 let bin_path = key_dir.join(format!("{}.bin", version_tag));
678 let (body, size) = if obj_meta.is_delete_marker {
679 (BodyRef::Memory(Bytes::new()), 0u64)
680 } else if bin_path.exists() {
681 let sz = std::fs::metadata(&bin_path)?.len();
682 (
683 BodyRef::Disk {
684 bucket: meta.name.clone(),
685 key: obj_meta.key.clone(),
686 version: if version_tag == "null" {
687 None
688 } else {
689 Some(version_tag.to_string())
690 },
691 path: bin_path,
692 size: sz,
693 },
694 sz,
695 )
696 } else {
697 return Err(StoreError::Other(format!(
702 "missing body file: {}",
703 bin_path.display()
704 )));
705 };
706 let _ = size;
707 key_name.get_or_insert_with(|| obj_meta.key.clone());
708 if version_tag == "null" && obj_meta.version_id.is_none() {
709 snap.objects.insert(
710 obj_meta.key.clone(),
711 LoadedObject {
712 meta: obj_meta,
713 body,
714 },
715 );
716 } else {
717 versioned.push(LoadedObject {
718 meta: obj_meta,
719 body,
720 });
721 }
722 }
723 if !versioned.is_empty() {
724 versioned.sort_by_key(|v| v.meta.last_modified);
725 if let Some(key) = key_name {
726 match versioned.last() {
733 Some(newest) if newest.meta.is_delete_marker => {
734 snap.objects.remove(&key);
735 }
736 Some(newest) => {
737 snap.objects.insert(key.clone(), newest.clone());
738 }
739 None => {}
740 }
741 snap.object_versions.insert(key, versioned);
742 }
743 }
744 }
745 }
746
747 let mpu_root = bdir.join("mpu");
748 if mpu_root.exists() {
749 for upload_entry in std::fs::read_dir(&mpu_root)? {
750 let upload_entry = upload_entry?;
751 if !upload_entry.file_type()?.is_dir() {
752 continue;
753 }
754 let upload_dir = upload_entry.path();
755 let init_path = upload_dir.join("init.toml");
756 if !init_path.exists() {
757 continue;
758 }
759 let init_text = std::fs::read_to_string(&init_path)?;
760 let init: MpuInit = toml::from_str(&init_text)
761 .map_err(|e| StoreError::Serde(e.to_string()))?;
762 let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
763 let parts_dir = upload_dir.join("parts");
764 if parts_dir.exists() {
765 for part_entry in std::fs::read_dir(&parts_dir)? {
766 let part_entry = part_entry?;
767 let path = part_entry.path();
768 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
769 continue;
770 };
771 if !fname.ends_with(".toml") {
772 continue;
773 }
774 let stem = &fname[..fname.len() - 5];
775 let Ok(part_number) = stem.parse::<u32>() else {
776 continue;
777 };
778 let toml_text = std::fs::read_to_string(&path)?;
779 let part_meta: UploadPartMeta = toml::from_str(&toml_text)
780 .map_err(|e| StoreError::Serde(e.to_string()))?;
781 let bin_path = parts_dir.join(format!("{}.bin", part_number));
782 if !bin_path.exists() {
783 return Err(StoreError::Other(format!(
784 "missing multipart part body file: {}",
785 bin_path.display()
786 )));
787 }
788 let sz = std::fs::metadata(&bin_path)?.len();
789 let body = BodyRef::Disk {
790 bucket: meta.name.clone(),
791 key: format!("__mpu__/{}", init.upload_id),
792 version: Some(format!("part-{}", part_number)),
793 path: bin_path,
794 size: sz,
795 };
796 loaded_parts.insert(
797 part_number,
798 LoadedPart {
799 meta: part_meta,
800 body,
801 },
802 );
803 }
804 }
805 snap.multipart_uploads.insert(
806 init.upload_id.clone(),
807 LoadedMpu {
808 init,
809 parts: loaded_parts,
810 },
811 );
812 }
813 }
814
815 Ok(Some(snap))
816 })();
817 match loaded {
818 Ok(Some(snap)) => {
819 state.buckets.insert(snap.meta.name.clone(), snap);
820 }
821 Ok(None) => {}
822 Err(e) => tracing::warn!(
823 bucket = %bdir.display(),
824 error = %e,
825 "skipping unreadable S3 bucket during load"
826 ),
827 }
828 }
829 Ok(state)
830 }
831
832 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
833 let dir = self.bucket_dir(bucket);
834 std::fs::create_dir_all(&dir)?;
835 crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
836 Ok(())
837 }
838
839 fn put_bucket_subresource(
840 &self,
841 bucket: &str,
842 kind: BucketSubresource,
843 payload: &str,
844 ) -> StoreResult<()> {
845 let dir = self.bucket_dir(bucket);
846 std::fs::create_dir_all(&dir)?;
847 let path = dir.join(Self::subresource_filename(kind));
848 crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
849 Ok(())
850 }
851
852 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
853 let path = self
854 .bucket_dir(bucket)
855 .join(Self::subresource_filename(kind));
856 match std::fs::remove_file(&path) {
857 Ok(_) => Ok(()),
858 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
859 Err(e) => Err(e.into()),
860 }
861 }
862
863 fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
864 let dir = self.bucket_dir(bucket);
865 match std::fs::remove_dir_all(&dir) {
866 Ok(_) => Ok(()),
867 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
868 Err(e) => Err(e.into()),
869 }
870 }
871
872 fn put_object(
873 &self,
874 bucket: &str,
875 key: &str,
876 version: Option<&str>,
877 body: BodySource,
878 meta: &ObjectMeta,
879 ) -> StoreResult<BodyRef> {
880 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
881 std::fs::create_dir_all(&dir)?;
882
883 if meta.is_delete_marker {
884 crate::atomic::write_atomic_toml(&toml_path, meta)?;
885 return Ok(BodyRef::Memory(Bytes::new()));
886 }
887
888 let size: u64;
889 let bytes_for_cache: Option<Bytes>;
890 match body {
891 BodySource::Bytes(b) => {
892 size = b.len() as u64;
893 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
894 bytes_for_cache = Some(b);
895 }
896 BodySource::File(src) => {
897 let src_size = std::fs::metadata(&src)?.len();
898 size = src_size;
899 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
900 bytes_for_cache = None;
901 }
902 BodySource::FileCopy(src) => {
903 let src_size = std::fs::metadata(&src)?.len();
904 size = src_size;
905 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
906 bytes_for_cache = None;
907 }
908 }
909
910 crate::atomic::write_atomic_toml(&toml_path, meta)?;
911
912 let body_key = crate::cache::BodyKey::new(
913 bucket.to_string(),
914 key.to_string(),
915 version.map(|s| s.to_string()),
916 );
917 if let Some(b) = bytes_for_cache {
918 self.cache.insert(body_key, b);
919 } else {
920 self.cache.invalidate(&crate::cache::BodyKey::new(
921 bucket.to_string(),
922 key.to_string(),
923 version.map(|s| s.to_string()),
924 ));
925 }
926
927 Ok(BodyRef::Disk {
928 bucket: bucket.to_string(),
929 key: key.to_string(),
930 version: version.map(|s| s.to_string()),
931 path: bin_path,
932 size,
933 })
934 }
935
936 fn put_object_meta(
937 &self,
938 bucket: &str,
939 key: &str,
940 version: Option<&str>,
941 meta: &ObjectMeta,
942 ) -> StoreResult<()> {
943 let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
944 std::fs::create_dir_all(&dir)?;
945 crate::atomic::write_atomic_toml(&toml_path, meta)?;
946 Ok(())
947 }
948
949 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
950 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
951 for p in [&bin_path, &toml_path] {
952 match std::fs::remove_file(p) {
953 Ok(_) => {}
954 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
955 Err(e) => return Err(e.into()),
956 }
957 }
958 Self::cleanup_empty(&dir);
959
960 self.cache.invalidate(&crate::cache::BodyKey::new(
961 bucket.to_string(),
962 key.to_string(),
963 version.map(|s| s.to_string()),
964 ));
965 Ok(())
966 }
967
968 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
969 match body {
970 BodyRef::Memory(b) => Ok(b.clone()),
971 BodyRef::Disk {
972 bucket,
973 key,
974 version,
975 path,
976 size: _,
977 } => {
978 let body_key =
979 crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
980 if let Some(bytes) = self.cache.get(&body_key) {
981 return Ok(bytes);
982 }
983 let bytes = Bytes::from(std::fs::read(path)?);
984 self.cache.insert(body_key, bytes.clone());
985 Ok(bytes)
986 }
987 }
988 }
989
990 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
991 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
992 std::fs::create_dir_all(&parts_dir)?;
993 let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
994 crate::atomic::write_atomic_toml(&init_path, init)?;
995 Ok(())
996 }
997
998 fn mpu_put_part(
999 &self,
1000 bucket: &str,
1001 upload_id: &str,
1002 part_number: u32,
1003 body: BodySource,
1004 etag: &str,
1005 ) -> StoreResult<BodyRef> {
1006 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1007 std::fs::create_dir_all(&parts_dir)?;
1008 let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
1009 let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
1010
1011 let size: u64 = match body {
1012 BodySource::Bytes(b) => {
1013 let n = b.len() as u64;
1014 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
1015 let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
1016 self.cache.insert(cache_key, b);
1017 n
1018 }
1019 BodySource::File(src) => {
1020 let n = std::fs::metadata(&src)?.len();
1021 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
1022 self.cache
1023 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1024 n
1025 }
1026 BodySource::FileCopy(src) => {
1027 let n = std::fs::metadata(&src)?.len();
1028 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
1029 self.cache
1030 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1031 n
1032 }
1033 };
1034
1035 let meta = UploadPartMeta {
1036 part_number,
1037 etag: etag.to_string(),
1038 size,
1039 last_modified: Utc::now(),
1040 };
1041 crate::atomic::write_atomic_toml(&toml_path, &meta)?;
1042 Ok(BodyRef::Disk {
1045 bucket: bucket.to_string(),
1046 key: format!("__mpu__/{upload_id}/part-{part_number:05}"),
1047 version: None,
1048 path: bin_path,
1049 size,
1050 })
1051 }
1052
1053 fn spool_dir(&self) -> Option<std::path::PathBuf> {
1054 let dir = self.root.join(".spool");
1055 let _ = std::fs::create_dir_all(&dir);
1059 Some(dir)
1060 }
1061
1062 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
1063 let dir = self.mpu_dir(bucket, upload_id);
1064 if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
1066 for entry in entries.flatten() {
1067 let path = entry.path();
1068 if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
1069 if let Some(stem) = fname.strip_suffix(".bin") {
1070 if let Ok(n) = stem.parse::<u32>() {
1071 self.cache
1072 .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1073 }
1074 }
1075 }
1076 }
1077 }
1078 match std::fs::remove_dir_all(&dir) {
1079 Ok(_) => Ok(()),
1080 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1081 Err(e) => Err(e.into()),
1082 }
1083 }
1084
1085 fn mpu_complete(
1086 &self,
1087 bucket: &str,
1088 upload_id: &str,
1089 final_key: &str,
1090 version: Option<&str>,
1091 meta: &ObjectMeta,
1092 ) -> StoreResult<BodyRef> {
1093 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1094 if !parts_dir.exists() {
1095 return Err(io_other(format!(
1096 "mpu_complete: no parts dir for upload {}",
1097 upload_id
1098 )));
1099 }
1100
1101 let mut part_numbers: Vec<u32> = Vec::new();
1103 for entry in std::fs::read_dir(&parts_dir)? {
1104 let entry = entry?;
1105 let path = entry.path();
1106 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1107 continue;
1108 };
1109 if let Some(stem) = fname.strip_suffix(".toml") {
1110 if let Ok(n) = stem.parse::<u32>() {
1111 if self.mpu_part_bin(bucket, upload_id, n).exists() {
1112 part_numbers.push(n);
1113 }
1114 }
1115 }
1116 }
1117 part_numbers.sort_unstable();
1118 if part_numbers.is_empty() {
1119 return Err(io_other(format!(
1120 "mpu_complete: upload {} has no parts",
1121 upload_id
1122 )));
1123 }
1124
1125 let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1126 std::fs::create_dir_all(&dir)?;
1127
1128 let total_size: u64 = if part_numbers.len() == 1 {
1129 let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1130 let sz = std::fs::metadata(&only)?.len();
1131 match std::fs::rename(&only, &bin_path) {
1132 Ok(_) => {}
1133 Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1134 {
1136 let mut input = std::fs::File::open(&only)?;
1137 let tmp = {
1138 let mut os = bin_path.as_os_str().to_owned();
1139 os.push(".tmp");
1140 PathBuf::from(os)
1141 };
1142 let mut out = std::fs::OpenOptions::new()
1143 .write(true)
1144 .create(true)
1145 .truncate(true)
1146 .open(&tmp)?;
1147 std::io::copy(&mut input, &mut out)?;
1148 out.sync_all()?;
1149 std::fs::rename(&tmp, &bin_path)?;
1150 }
1151 let _ = std::fs::remove_file(&only);
1152 }
1153 Err(e) => return Err(e.into()),
1154 }
1155 if let Some(parent) = bin_path.parent() {
1160 if let Ok(dir_handle) = std::fs::File::open(parent) {
1161 let _ = dir_handle.sync_all();
1162 }
1163 }
1164 sz
1165 } else {
1166 let tmp = {
1167 let mut os = bin_path.as_os_str().to_owned();
1168 os.push(".tmp");
1169 PathBuf::from(os)
1170 };
1171 let mut total: u64 = 0;
1172 {
1173 let mut out = std::fs::OpenOptions::new()
1174 .write(true)
1175 .create(true)
1176 .truncate(true)
1177 .open(&tmp)?;
1178 for n in &part_numbers {
1179 let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1180 let mut input = std::fs::File::open(&part_path)?;
1181 let copied = std::io::copy(&mut input, &mut out)?;
1182 total += copied;
1183 }
1184 out.sync_all()?;
1185 }
1186 std::fs::rename(&tmp, &bin_path)?;
1187 if let Some(parent) = bin_path.parent() {
1188 if let Ok(dir_handle) = std::fs::File::open(parent) {
1189 let _ = dir_handle.sync_all();
1190 }
1191 }
1192 total
1193 };
1194
1195 crate::atomic::write_atomic_toml(&toml_path, meta)?;
1196
1197 for n in &part_numbers {
1199 self.cache
1200 .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1201 }
1202 let mpu_dir = self.mpu_dir(bucket, upload_id);
1203 let _ = std::fs::remove_dir_all(&mpu_dir);
1204
1205 self.cache.invalidate(&crate::cache::BodyKey::new(
1210 bucket.to_string(),
1211 final_key.to_string(),
1212 version.map(|s| s.to_string()),
1213 ));
1214
1215 Ok(BodyRef::Disk {
1216 bucket: bucket.to_string(),
1217 key: final_key.to_string(),
1218 version: version.map(|s| s.to_string()),
1219 path: bin_path,
1220 size: total_size,
1221 })
1222 }
1223}
1224
1225fn libc_exdev() -> i32 {
1226 18
1227}
1228
1229#[cfg(test)]
1230mod disk_tests {
1231 use super::*;
1232 use std::sync::Arc;
1233 use tempfile::TempDir;
1234
1235 fn new_store(tmp: &TempDir) -> DiskS3Store {
1236 let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1237 DiskS3Store::new(tmp.path().to_path_buf(), cache)
1238 }
1239
1240 fn new_store_with_cache(
1241 tmp: &TempDir,
1242 cap: u64,
1243 ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1244 let cache = Arc::new(crate::cache::BodyCache::new(cap));
1245 (
1246 DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1247 cache,
1248 )
1249 }
1250
1251 fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1252 ObjectMeta {
1253 key: key.to_string(),
1254 content_type: "application/octet-stream".to_string(),
1255 etag: "etag".to_string(),
1256 size,
1257 ..Default::default()
1258 }
1259 }
1260
1261 #[test]
1262 fn put_bucket_meta_roundtrip() {
1263 let tmp = TempDir::new().unwrap();
1264 let store = new_store(&tmp);
1265 let meta = BucketMeta {
1266 name: "b1".to_string(),
1267 region: "us-east-1".to_string(),
1268 versioning: Some("Enabled".to_string()),
1269 ..Default::default()
1270 };
1271 store.put_bucket_meta("b1", &meta).unwrap();
1272 let loaded = store.load().unwrap();
1273 let snap = loaded.buckets.get("b1").unwrap();
1274 assert_eq!(snap.meta.name, "b1");
1275 assert_eq!(snap.meta.region, "us-east-1");
1276 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1277 }
1278
1279 #[test]
1280 fn put_bucket_subresource_each_variant_writes_file() {
1281 let tmp = TempDir::new().unwrap();
1282 let store = new_store(&tmp);
1283 store
1284 .put_bucket_meta(
1285 "b",
1286 &BucketMeta {
1287 name: "b".to_string(),
1288 ..Default::default()
1289 },
1290 )
1291 .unwrap();
1292 let variants = [
1293 BucketSubresource::Tags,
1294 BucketSubresource::Lifecycle,
1295 BucketSubresource::Cors,
1296 BucketSubresource::Policy,
1297 BucketSubresource::Notification,
1298 BucketSubresource::Logging,
1299 BucketSubresource::Website,
1300 BucketSubresource::PublicAccessBlock,
1301 BucketSubresource::ObjectLock,
1302 BucketSubresource::Replication,
1303 BucketSubresource::Ownership,
1304 BucketSubresource::Inventory,
1305 BucketSubresource::Encryption,
1306 BucketSubresource::Versioning,
1307 BucketSubresource::Acl,
1308 BucketSubresource::Accelerate,
1309 ];
1310 for v in variants {
1311 store
1312 .put_bucket_subresource("b", v, "payload=true")
1313 .unwrap();
1314 let file = store
1315 .bucket_dir("b")
1316 .join(DiskS3Store::subresource_filename(v));
1317 assert!(file.exists(), "{:?}", v);
1318 assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1319 }
1320 }
1321
1322 #[test]
1323 fn delete_bucket_subresource_removes_file() {
1324 let tmp = TempDir::new().unwrap();
1325 let store = new_store(&tmp);
1326 store
1327 .put_bucket_meta(
1328 "b",
1329 &BucketMeta {
1330 name: "b".to_string(),
1331 ..Default::default()
1332 },
1333 )
1334 .unwrap();
1335 store
1336 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1337 .unwrap();
1338 store
1339 .delete_bucket_subresource("b", BucketSubresource::Tags)
1340 .unwrap();
1341 let file = store.bucket_dir("b").join("tags.toml");
1342 assert!(!file.exists());
1343 store
1345 .delete_bucket_subresource("b", BucketSubresource::Tags)
1346 .unwrap();
1347 }
1348
1349 #[test]
1350 fn put_object_bytes_roundtrip() {
1351 let tmp = TempDir::new().unwrap();
1352 let store = new_store(&tmp);
1353 store
1354 .put_bucket_meta(
1355 "b",
1356 &BucketMeta {
1357 name: "b".to_string(),
1358 ..Default::default()
1359 },
1360 )
1361 .unwrap();
1362 let data = Bytes::from_static(b"hello world");
1363 let meta = sample_meta("k1", data.len() as u64);
1364 let body_ref = store
1365 .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1366 .unwrap();
1367 match &body_ref {
1368 BodyRef::Disk {
1369 bucket,
1370 key,
1371 size,
1372 path,
1373 ..
1374 } => {
1375 assert_eq!(bucket, "b");
1376 assert_eq!(key, "k1");
1377 assert_eq!(*size, data.len() as u64);
1378 assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1379 }
1380 _ => panic!("expected Disk"),
1381 }
1382 let loaded = store.load().unwrap();
1383 let snap = loaded.buckets.get("b").unwrap();
1384 let obj = snap.objects.get("k1").unwrap();
1385 assert_eq!(obj.meta.size, data.len() as u64);
1386 }
1387
1388 #[test]
1389 fn put_object_file_copy_source_leaves_src_in_place() {
1390 let tmp = TempDir::new().unwrap();
1391 let store = new_store(&tmp);
1392 store
1393 .put_bucket_meta(
1394 "b",
1395 &BucketMeta {
1396 name: "b".to_string(),
1397 ..Default::default()
1398 },
1399 )
1400 .unwrap();
1401 let src_dir = TempDir::new().unwrap();
1402 let src = src_dir.path().join("src.bin");
1403 std::fs::write(&src, b"copied-body").unwrap();
1404 let meta = sample_meta("k", 11);
1405 let bref = store
1406 .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1407 .unwrap();
1408 match bref {
1409 BodyRef::Disk { path, size, .. } => {
1410 assert_eq!(size, 11);
1411 assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1412 }
1413 _ => panic!("expected Disk bodyref"),
1414 }
1415 assert!(src.exists(), "source file must not be moved by FileCopy");
1416 assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1417 }
1418
1419 #[test]
1420 fn put_object_file_source() {
1421 let tmp = TempDir::new().unwrap();
1422 let store = new_store(&tmp);
1423 store
1424 .put_bucket_meta(
1425 "b",
1426 &BucketMeta {
1427 name: "b".to_string(),
1428 ..Default::default()
1429 },
1430 )
1431 .unwrap();
1432 let src = tmp.path().join("src.bin");
1433 std::fs::write(&src, b"file-body").unwrap();
1434 let meta = sample_meta("k", 9);
1435 let body_ref = store
1436 .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1437 .unwrap();
1438 let path = match body_ref {
1439 BodyRef::Disk { path, .. } => path,
1440 _ => panic!(),
1441 };
1442 assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1443 }
1444
1445 #[test]
1446 fn put_object_meta_only_keeps_bin() {
1447 let tmp = TempDir::new().unwrap();
1448 let store = new_store(&tmp);
1449 store
1450 .put_bucket_meta(
1451 "b",
1452 &BucketMeta {
1453 name: "b".to_string(),
1454 ..Default::default()
1455 },
1456 )
1457 .unwrap();
1458 let data = Bytes::from_static(b"abc");
1459 let mut meta = sample_meta("k", 3);
1460 store
1461 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1462 .unwrap();
1463 let (_, bin, _) = store.object_paths("b", "k", None);
1464 let before = std::fs::read(&bin).unwrap();
1465 meta.tags.insert("x".to_string(), "y".to_string());
1466 store.put_object_meta("b", "k", None, &meta).unwrap();
1467 assert_eq!(std::fs::read(&bin).unwrap(), before);
1468 let loaded = store.load().unwrap();
1469 let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1470 assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1471 }
1472
1473 #[test]
1474 fn delete_object_cleans_up_files_and_cache() {
1475 let tmp = TempDir::new().unwrap();
1476 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1477 store
1478 .put_bucket_meta(
1479 "b",
1480 &BucketMeta {
1481 name: "b".to_string(),
1482 ..Default::default()
1483 },
1484 )
1485 .unwrap();
1486 let data = Bytes::from_static(b"bye");
1487 let meta = sample_meta("k", 3);
1488 store
1489 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1490 .unwrap();
1491 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1492 assert!(cache.get(&body_key).is_some());
1493 store.delete_object("b", "k", None).unwrap();
1494 let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1495 assert!(!bin.exists());
1496 assert!(!toml_path.exists());
1497 assert!(!dir.exists());
1498 assert!(cache.get(&body_key).is_none());
1499 }
1500
1501 #[test]
1502 fn open_object_body_cache_hit_and_refill() {
1503 let tmp = TempDir::new().unwrap();
1504 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1505 store
1506 .put_bucket_meta(
1507 "b",
1508 &BucketMeta {
1509 name: "b".to_string(),
1510 ..Default::default()
1511 },
1512 )
1513 .unwrap();
1514 let data = Bytes::from_static(b"payload");
1515 let meta = sample_meta("k", data.len() as u64);
1516 let body_ref = store
1517 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1518 .unwrap();
1519 let got = store.open_object_body(&body_ref).unwrap();
1521 assert_eq!(got, data);
1522 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1524 cache.invalidate(&body_key);
1525 assert!(cache.get(&body_key).is_none());
1526 let got = store.open_object_body(&body_ref).unwrap();
1527 assert_eq!(got, data);
1528 assert!(cache.get(&body_key).is_some());
1529 }
1530
1531 #[test]
1532 fn open_object_body_large_bypasses_cache() {
1533 let tmp = TempDir::new().unwrap();
1534 let (store, cache) = new_store_with_cache(&tmp, 1024);
1536 store
1537 .put_bucket_meta(
1538 "b",
1539 &BucketMeta {
1540 name: "b".to_string(),
1541 ..Default::default()
1542 },
1543 )
1544 .unwrap();
1545 let data = Bytes::from(vec![7u8; 800]);
1546 let meta = sample_meta("big", 800);
1547 let body_ref = store
1548 .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1549 .unwrap();
1550 let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1551 assert!(cache.get(&body_key).is_none());
1552 let got = store.open_object_body(&body_ref).unwrap();
1553 assert_eq!(got, data);
1554 assert!(cache.get(&body_key).is_none());
1556 }
1557
1558 #[test]
1559 fn load_empty_dir() {
1560 let tmp = TempDir::new().unwrap();
1561 let store = new_store(&tmp);
1562 let s = store.load().unwrap();
1563 assert!(s.buckets.is_empty());
1564 }
1565
1566 #[test]
1567 fn load_skips_mpu_without_init() {
1568 let tmp = TempDir::new().unwrap();
1569 let store = new_store(&tmp);
1570 store
1571 .put_bucket_meta(
1572 "b",
1573 &BucketMeta {
1574 name: "b".to_string(),
1575 ..Default::default()
1576 },
1577 )
1578 .unwrap();
1579 let data = Bytes::from_static(b"x");
1580 let meta = sample_meta("k", 1);
1581 store
1582 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1583 .unwrap();
1584 let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1586 std::fs::create_dir_all(&mpu).unwrap();
1587
1588 let loaded = store.load().unwrap();
1589 let snap = loaded.buckets.get("b").unwrap();
1590 assert_eq!(snap.objects.len(), 1);
1591 assert!(snap.multipart_uploads.is_empty());
1592 }
1593
1594 #[test]
1595 fn load_reads_bucket_subresources() {
1596 let tmp = TempDir::new().unwrap();
1597 let store = new_store(&tmp);
1598 store
1599 .put_bucket_meta(
1600 "b",
1601 &BucketMeta {
1602 name: "b".to_string(),
1603 ..Default::default()
1604 },
1605 )
1606 .unwrap();
1607 store
1608 .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1609 .unwrap();
1610 store
1611 .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1612 .unwrap();
1613 store
1614 .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1615 .unwrap();
1616 store
1617 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1618 .unwrap();
1619 let loaded = store.load().unwrap();
1620 let snap = loaded.buckets.get("b").unwrap();
1621 assert_eq!(
1622 snap.subresources.get("lifecycle.toml").map(String::as_str),
1623 Some("<Lifecycle/>"),
1624 );
1625 assert_eq!(
1626 snap.subresources.get("cors.toml").map(String::as_str),
1627 Some("<Cors/>"),
1628 );
1629 assert_eq!(
1630 snap.subresources.get("policy.toml").map(String::as_str),
1631 Some("{}"),
1632 );
1633 assert_eq!(
1634 snap.subresources.get("tags.toml").map(String::as_str),
1635 Some("x=1"),
1636 );
1637 }
1638
1639 #[test]
1640 fn versioned_put_load_roundtrip() {
1641 let tmp = TempDir::new().unwrap();
1642 let store = new_store(&tmp);
1643 store
1644 .put_bucket_meta(
1645 "b",
1646 &BucketMeta {
1647 name: "b".to_string(),
1648 versioning: Some("Enabled".to_string()),
1649 ..Default::default()
1650 },
1651 )
1652 .unwrap();
1653 let base = chrono::Utc::now();
1654 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1655 .iter()
1656 .enumerate()
1657 {
1658 let mut m = sample_meta("k", body.len() as u64);
1659 m.version_id = Some((*vid).to_string());
1660 m.last_modified = base + chrono::Duration::seconds(i as i64);
1661 store
1662 .put_object(
1663 "b",
1664 "k",
1665 Some(*vid),
1666 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1667 &m,
1668 )
1669 .unwrap();
1670 }
1671 let loaded = store.load().unwrap();
1672 let snap = loaded.buckets.get("b").unwrap();
1673 let versions = snap.object_versions.get("k").expect("versions present");
1674 assert_eq!(versions.len(), 3);
1675 assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1676 assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1677 assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1678 for v in versions {
1679 match &v.body {
1680 BodyRef::Disk { size, .. } => assert!(*size > 0),
1681 _ => panic!("expected Disk body"),
1682 }
1683 }
1684 }
1685
1686 #[test]
1687 fn versioned_load_promotes_latest_live_to_snap_objects() {
1688 let tmp = TempDir::new().unwrap();
1689 let store = new_store(&tmp);
1690 store
1691 .put_bucket_meta(
1692 "b",
1693 &BucketMeta {
1694 name: "b".to_string(),
1695 versioning: Some("Enabled".to_string()),
1696 ..Default::default()
1697 },
1698 )
1699 .unwrap();
1700 let base = chrono::Utc::now();
1701 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1702 .iter()
1703 .enumerate()
1704 {
1705 let mut m = sample_meta("k", body.len() as u64);
1706 m.version_id = Some((*vid).to_string());
1707 m.last_modified = base + chrono::Duration::seconds(i as i64);
1708 store
1709 .put_object(
1710 "b",
1711 "k",
1712 Some(*vid),
1713 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1714 &m,
1715 )
1716 .unwrap();
1717 }
1718 let loaded = store.load().unwrap();
1719 let snap = loaded.buckets.get("b").unwrap();
1720 let current = snap.objects.get("k").expect("current object promoted");
1721 assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1722 }
1723
1724 #[test]
1725 fn versioned_load_trailing_delete_marker_hides_current() {
1726 let tmp = TempDir::new().unwrap();
1727 let store = new_store(&tmp);
1728 store
1729 .put_bucket_meta(
1730 "b",
1731 &BucketMeta {
1732 name: "b".to_string(),
1733 versioning: Some("Enabled".to_string()),
1734 ..Default::default()
1735 },
1736 )
1737 .unwrap();
1738 let base = chrono::Utc::now();
1739 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1740 .iter()
1741 .enumerate()
1742 {
1743 let mut m = sample_meta("k", body.len() as u64);
1744 m.version_id = Some((*vid).to_string());
1745 m.last_modified = base + chrono::Duration::seconds(i as i64);
1746 store
1747 .put_object(
1748 "b",
1749 "k",
1750 Some(*vid),
1751 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1752 &m,
1753 )
1754 .unwrap();
1755 }
1756 let mut dm = sample_meta("k", 0);
1758 dm.version_id = Some("dm1".to_string());
1759 dm.is_delete_marker = true;
1760 dm.last_modified = base + chrono::Duration::seconds(10);
1761 store
1762 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1763 .unwrap();
1764 let loaded = store.load().unwrap();
1765 let snap = loaded.buckets.get("b").unwrap();
1766 assert!(
1767 !snap.objects.contains_key("k"),
1768 "trailing delete marker must hide current object",
1769 );
1770 assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1771 }
1772
1773 #[test]
1774 fn legacy_null_object_overridden_by_newer_versions() {
1775 let tmp = TempDir::new().unwrap();
1778 let store = new_store(&tmp);
1779 store
1780 .put_bucket_meta(
1781 "b",
1782 &BucketMeta {
1783 name: "b".to_string(),
1784 ..Default::default()
1785 },
1786 )
1787 .unwrap();
1788 let base = chrono::Utc::now();
1789 let mut null_meta = sample_meta("k", 3);
1790 null_meta.last_modified = base;
1791 store
1792 .put_object(
1793 "b",
1794 "k",
1795 None,
1796 BodySource::Bytes(Bytes::from_static(b"old")),
1797 &null_meta,
1798 )
1799 .unwrap();
1800 store
1802 .put_bucket_meta(
1803 "b",
1804 &BucketMeta {
1805 name: "b".to_string(),
1806 versioning: Some("Enabled".to_string()),
1807 ..Default::default()
1808 },
1809 )
1810 .unwrap();
1811 let mut v1 = sample_meta("k", 3);
1812 v1.version_id = Some("v1".to_string());
1813 v1.last_modified = base + chrono::Duration::seconds(1);
1814 store
1815 .put_object(
1816 "b",
1817 "k",
1818 Some("v1"),
1819 BodySource::Bytes(Bytes::from_static(b"new")),
1820 &v1,
1821 )
1822 .unwrap();
1823 let mut v2 = sample_meta("k", 5);
1824 v2.version_id = Some("v2".to_string());
1825 v2.last_modified = base + chrono::Duration::seconds(2);
1826 store
1827 .put_object(
1828 "b",
1829 "k",
1830 Some("v2"),
1831 BodySource::Bytes(Bytes::from_static(b"newer")),
1832 &v2,
1833 )
1834 .unwrap();
1835 let loaded = store.load().unwrap();
1836 let snap = loaded.buckets.get("b").unwrap();
1837 let current = snap
1838 .objects
1839 .get("k")
1840 .expect("latest live version must override stale null");
1841 assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1842 assert_eq!(current.meta.size, 5);
1843 }
1844
1845 #[test]
1846 fn legacy_null_hidden_by_trailing_delete_marker() {
1847 let tmp = TempDir::new().unwrap();
1848 let store = new_store(&tmp);
1849 store
1850 .put_bucket_meta(
1851 "b",
1852 &BucketMeta {
1853 name: "b".to_string(),
1854 ..Default::default()
1855 },
1856 )
1857 .unwrap();
1858 let base = chrono::Utc::now();
1859 let mut null_meta = sample_meta("k", 3);
1860 null_meta.last_modified = base;
1861 store
1862 .put_object(
1863 "b",
1864 "k",
1865 None,
1866 BodySource::Bytes(Bytes::from_static(b"old")),
1867 &null_meta,
1868 )
1869 .unwrap();
1870 store
1871 .put_bucket_meta(
1872 "b",
1873 &BucketMeta {
1874 name: "b".to_string(),
1875 versioning: Some("Enabled".to_string()),
1876 ..Default::default()
1877 },
1878 )
1879 .unwrap();
1880 let mut v1 = sample_meta("k", 3);
1881 v1.version_id = Some("v1".to_string());
1882 v1.last_modified = base + chrono::Duration::seconds(1);
1883 store
1884 .put_object(
1885 "b",
1886 "k",
1887 Some("v1"),
1888 BodySource::Bytes(Bytes::from_static(b"new")),
1889 &v1,
1890 )
1891 .unwrap();
1892 let mut dm = sample_meta("k", 0);
1893 dm.version_id = Some("dm1".to_string());
1894 dm.is_delete_marker = true;
1895 dm.last_modified = base + chrono::Duration::seconds(2);
1896 store
1897 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1898 .unwrap();
1899 let loaded = store.load().unwrap();
1900 let snap = loaded.buckets.get("b").unwrap();
1901 assert!(
1902 !snap.objects.contains_key("k"),
1903 "trailing delete marker must hide even a legacy null object",
1904 );
1905 }
1906
1907 #[test]
1908 fn delete_marker_roundtrip_no_body_file() {
1909 let tmp = TempDir::new().unwrap();
1910 let store = new_store(&tmp);
1911 store
1912 .put_bucket_meta(
1913 "b",
1914 &BucketMeta {
1915 name: "b".to_string(),
1916 versioning: Some("Enabled".to_string()),
1917 ..Default::default()
1918 },
1919 )
1920 .unwrap();
1921 let mut m = sample_meta("k", 0);
1922 m.version_id = Some("dm1".to_string());
1923 m.is_delete_marker = true;
1924 store
1925 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1926 .unwrap();
1927 let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1929 assert!(!bin.exists(), "delete marker must not have a .bin file");
1930 assert!(toml_path.exists());
1931
1932 let loaded = store.load().unwrap();
1933 let versions = loaded
1934 .buckets
1935 .get("b")
1936 .unwrap()
1937 .object_versions
1938 .get("k")
1939 .unwrap();
1940 assert_eq!(versions.len(), 1);
1941 assert!(versions[0].meta.is_delete_marker);
1942 match &versions[0].body {
1943 BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1944 _ => panic!("delete marker body should be empty Memory"),
1945 }
1946 }
1947
1948 #[test]
1949 fn mixed_nonversioned_and_versioned_buckets() {
1950 let tmp = TempDir::new().unwrap();
1951 let store = new_store(&tmp);
1952 store
1953 .put_bucket_meta(
1954 "a",
1955 &BucketMeta {
1956 name: "a".to_string(),
1957 ..Default::default()
1958 },
1959 )
1960 .unwrap();
1961 store
1962 .put_bucket_meta(
1963 "b",
1964 &BucketMeta {
1965 name: "b".to_string(),
1966 versioning: Some("Enabled".to_string()),
1967 ..Default::default()
1968 },
1969 )
1970 .unwrap();
1971 let ma = sample_meta("only", 3);
1972 store
1973 .put_object(
1974 "a",
1975 "only",
1976 None,
1977 BodySource::Bytes(Bytes::from_static(b"aaa")),
1978 &ma,
1979 )
1980 .unwrap();
1981 let base = chrono::Utc::now();
1982 for (i, vid) in ["v1", "v2"].iter().enumerate() {
1983 let mut m = sample_meta("twice", 2);
1984 m.version_id = Some((*vid).to_string());
1985 m.last_modified = base + chrono::Duration::seconds(i as i64);
1986 store
1987 .put_object(
1988 "b",
1989 "twice",
1990 Some(*vid),
1991 BodySource::Bytes(Bytes::from_static(b"xx")),
1992 &m,
1993 )
1994 .unwrap();
1995 }
1996 let loaded = store.load().unwrap();
1997 assert_eq!(loaded.buckets.len(), 2);
1998 let a = loaded.buckets.get("a").unwrap();
1999 assert_eq!(a.objects.len(), 1);
2000 assert!(a.object_versions.is_empty());
2001 let b = loaded.buckets.get("b").unwrap();
2002 assert_eq!(
2005 b.objects.get("twice").unwrap().meta.version_id.as_deref(),
2006 Some("v2")
2007 );
2008 assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
2009 }
2010
2011 fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
2012 MpuInit {
2013 upload_id: upload_id.to_string(),
2014 key: key.to_string(),
2015 initiated: chrono::Utc::now(),
2016 content_type: "application/octet-stream".to_string(),
2017 storage_class: "STANDARD".to_string(),
2018 ..Default::default()
2019 }
2020 }
2021
2022 #[test]
2023 fn mpu_create_then_load_empty_parts() {
2024 let tmp = TempDir::new().unwrap();
2025 let store = new_store(&tmp);
2026 store
2027 .put_bucket_meta(
2028 "b",
2029 &BucketMeta {
2030 name: "b".to_string(),
2031 ..Default::default()
2032 },
2033 )
2034 .unwrap();
2035 let init = sample_mpu_init("up1", "k1");
2036 store.mpu_create("b", "up1", &init).unwrap();
2037 let loaded = store.load().unwrap();
2038 let snap = loaded.buckets.get("b").unwrap();
2039 let m = snap.multipart_uploads.get("up1").expect("upload present");
2040 assert_eq!(m.init.upload_id, "up1");
2041 assert_eq!(m.init.key, "k1");
2042 assert!(m.parts.is_empty());
2043 }
2044
2045 #[test]
2046 fn mpu_put_part_then_load_three_parts() {
2047 let tmp = TempDir::new().unwrap();
2048 let store = new_store(&tmp);
2049 store
2050 .put_bucket_meta(
2051 "b",
2052 &BucketMeta {
2053 name: "b".to_string(),
2054 ..Default::default()
2055 },
2056 )
2057 .unwrap();
2058 store
2059 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2060 .unwrap();
2061 let bodies = [
2062 Bytes::from_static(b"part-one"),
2063 Bytes::from_static(b"part-two-longer"),
2064 Bytes::from_static(b"p3"),
2065 ];
2066 for (i, body) in bodies.iter().enumerate() {
2067 let n = (i + 1) as u32;
2068 store
2069 .mpu_put_part(
2070 "b",
2071 "up",
2072 n,
2073 BodySource::Bytes(body.clone()),
2074 &format!("et{}", n),
2075 )
2076 .unwrap();
2077 }
2078 let loaded = store.load().unwrap();
2079 let snap = loaded.buckets.get("b").unwrap();
2080 let m = snap.multipart_uploads.get("up").unwrap();
2081 assert_eq!(m.parts.len(), 3);
2082 for (i, body) in bodies.iter().enumerate() {
2083 let n = (i + 1) as u32;
2084 let part = m.parts.get(&n).unwrap();
2085 assert_eq!(part.meta.part_number, n);
2086 assert_eq!(part.meta.size, body.len() as u64);
2087 assert_eq!(part.meta.etag, format!("et{}", n));
2088 match &part.body {
2089 BodyRef::Disk { path, size, .. } => {
2090 assert_eq!(*size, body.len() as u64);
2091 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2092 }
2093 _ => panic!("expected Disk"),
2094 }
2095 }
2096 }
2097
2098 #[test]
2099 fn mpu_abort_removes_upload() {
2100 let tmp = TempDir::new().unwrap();
2101 let store = new_store(&tmp);
2102 store
2103 .put_bucket_meta(
2104 "b",
2105 &BucketMeta {
2106 name: "b".to_string(),
2107 ..Default::default()
2108 },
2109 )
2110 .unwrap();
2111 store
2112 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2113 .unwrap();
2114 store
2115 .mpu_put_part(
2116 "b",
2117 "up",
2118 1,
2119 BodySource::Bytes(Bytes::from_static(b"x")),
2120 "e",
2121 )
2122 .unwrap();
2123 store.mpu_abort("b", "up").unwrap();
2124 assert!(!store.mpu_dir("b", "up").exists());
2125 store.mpu_abort("b", "up").unwrap();
2127 let loaded = store.load().unwrap();
2128 let snap = loaded.buckets.get("b").unwrap();
2129 assert!(snap.multipart_uploads.is_empty());
2130 }
2131
2132 #[test]
2133 fn mpu_complete_single_part_fast_path() {
2134 let tmp = TempDir::new().unwrap();
2135 let store = new_store(&tmp);
2136 store
2137 .put_bucket_meta(
2138 "b",
2139 &BucketMeta {
2140 name: "b".to_string(),
2141 ..Default::default()
2142 },
2143 )
2144 .unwrap();
2145 store
2146 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2147 .unwrap();
2148 let body = Bytes::from_static(b"only-part-bytes");
2149 store
2150 .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2151 .unwrap();
2152 let meta = sample_meta("k", body.len() as u64);
2153 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2154 match &body_ref {
2155 BodyRef::Disk { path, size, .. } => {
2156 assert_eq!(*size, body.len() as u64);
2157 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2158 }
2159 _ => panic!("expected Disk"),
2160 }
2161 assert!(!store.mpu_dir("b", "up").exists());
2162 }
2163
2164 #[test]
2165 fn mpu_complete_multi_part_concat() {
2166 let tmp = TempDir::new().unwrap();
2167 let store = new_store(&tmp);
2168 store
2169 .put_bucket_meta(
2170 "b",
2171 &BucketMeta {
2172 name: "b".to_string(),
2173 ..Default::default()
2174 },
2175 )
2176 .unwrap();
2177 store
2178 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2179 .unwrap();
2180 let p1 = Bytes::from_static(b"AAAA");
2181 let p2 = Bytes::from_static(b"BBBBBB");
2182 let p3 = Bytes::from_static(b"CC");
2183 for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2184 store
2185 .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2186 .unwrap();
2187 }
2188 let mut expected = Vec::new();
2189 expected.extend_from_slice(&p1);
2190 expected.extend_from_slice(&p2);
2191 expected.extend_from_slice(&p3);
2192 let meta = sample_meta("k", expected.len() as u64);
2193 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2194 let path = match body_ref {
2195 BodyRef::Disk { path, size, .. } => {
2196 assert_eq!(size, expected.len() as u64);
2197 path
2198 }
2199 _ => panic!("expected Disk"),
2200 };
2201 assert_eq!(std::fs::read(&path).unwrap(), expected);
2202 assert!(!store.mpu_dir("b", "up").exists());
2203 }
2204
2205 #[test]
2206 fn mpu_complete_large_streaming_via_file_source() {
2207 let tmp = TempDir::new().unwrap();
2208 let store = new_store(&tmp);
2209 store
2210 .put_bucket_meta(
2211 "b",
2212 &BucketMeta {
2213 name: "b".to_string(),
2214 ..Default::default()
2215 },
2216 )
2217 .unwrap();
2218 store
2219 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2220 .unwrap();
2221
2222 const PART_SIZE: usize = 1024 * 1024;
2225 let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2226 let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2227 for (i, byte) in patterns.iter().enumerate() {
2228 let src = tmp.path().join(format!("src-{}.bin", i + 1));
2229 let data = vec![*byte; PART_SIZE];
2230 std::fs::write(&src, &data).unwrap();
2231 expected.extend_from_slice(&data);
2232 store
2233 .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2234 .unwrap();
2235 }
2236 let meta = sample_meta("k", expected.len() as u64);
2237 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2238 let path = match body_ref {
2239 BodyRef::Disk { path, size, .. } => {
2240 assert_eq!(size, expected.len() as u64);
2241 path
2242 }
2243 _ => panic!("expected Disk"),
2244 };
2245 let actual = std::fs::read(&path).unwrap();
2246 assert_eq!(actual.len(), expected.len());
2247 assert_eq!(actual, expected);
2248 assert!(!store.mpu_dir("b", "up").exists());
2249 }
2250
2251 #[test]
2252 fn mpu_resumable_across_load() {
2253 let tmp = TempDir::new().unwrap();
2254 let store = new_store(&tmp);
2255 store
2256 .put_bucket_meta(
2257 "b",
2258 &BucketMeta {
2259 name: "b".to_string(),
2260 ..Default::default()
2261 },
2262 )
2263 .unwrap();
2264 store
2265 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2266 .unwrap();
2267 let p1 = Bytes::from_static(b"hello-");
2268 let p2 = Bytes::from_static(b"world-");
2269 store
2270 .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2271 .unwrap();
2272 store
2273 .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2274 .unwrap();
2275
2276 let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2278 let loaded = store2.load().unwrap();
2279 let snap = loaded.buckets.get("b").unwrap();
2280 let m = snap.multipart_uploads.get("up").unwrap();
2281 assert_eq!(m.parts.len(), 2);
2282 assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2283 assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2284
2285 let p3 = Bytes::from_static(b"again!");
2287 store2
2288 .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2289 .unwrap();
2290 let mut expected = Vec::new();
2291 expected.extend_from_slice(&p1);
2292 expected.extend_from_slice(&p2);
2293 expected.extend_from_slice(&p3);
2294 let meta = sample_meta("k", expected.len() as u64);
2295 let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2296 let path = match body_ref {
2297 BodyRef::Disk { path, .. } => path,
2298 _ => panic!(),
2299 };
2300 assert_eq!(std::fs::read(&path).unwrap(), expected);
2301 assert!(!store2.mpu_dir("b", "up").exists());
2302 }
2303
2304 #[test]
2305 fn tags_snapshot_roundtrip_via_store() {
2306 let tmp = TempDir::new().unwrap();
2307 let store = new_store(&tmp);
2308 store
2309 .put_bucket_meta(
2310 "b",
2311 &BucketMeta {
2312 name: "b".to_string(),
2313 ..Default::default()
2314 },
2315 )
2316 .unwrap();
2317 let mut tags = BTreeMap::new();
2318 tags.insert("env".to_string(), "prod".to_string());
2319 tags.insert("team".to_string(), "s3".to_string());
2320 let snap = TagsSnapshot { tags: tags.clone() };
2321 let payload = toml::to_string(&snap).unwrap();
2322 store
2323 .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2324 .unwrap();
2325 let loaded = store.load().unwrap();
2326 let text = loaded
2327 .buckets
2328 .get("b")
2329 .unwrap()
2330 .subresources
2331 .get("tags.toml")
2332 .cloned()
2333 .unwrap();
2334 let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2335 assert_eq!(decoded.tags, tags);
2336 }
2337
2338 #[test]
2339 fn acl_snapshot_roundtrip_via_store() {
2340 let tmp = TempDir::new().unwrap();
2341 let store = new_store(&tmp);
2342 store
2343 .put_bucket_meta(
2344 "b",
2345 &BucketMeta {
2346 name: "b".to_string(),
2347 ..Default::default()
2348 },
2349 )
2350 .unwrap();
2351 let snap = AclSnapshot {
2352 owner_id: "owner-abc".to_string(),
2353 grants: vec![
2354 AclGrantSnapshot {
2355 grantee_type: "CanonicalUser".to_string(),
2356 grantee_id: Some("owner-abc".to_string()),
2357 grantee_display_name: Some("owner".to_string()),
2358 grantee_uri: None,
2359 permission: "FULL_CONTROL".to_string(),
2360 },
2361 AclGrantSnapshot {
2362 grantee_type: "Group".to_string(),
2363 grantee_id: None,
2364 grantee_display_name: None,
2365 grantee_uri: Some(
2366 "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2367 ),
2368 permission: "READ".to_string(),
2369 },
2370 ],
2371 };
2372 let payload = toml::to_string(&snap).unwrap();
2373 store
2374 .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2375 .unwrap();
2376 let loaded = store.load().unwrap();
2377 let text = loaded
2378 .buckets
2379 .get("b")
2380 .unwrap()
2381 .subresources
2382 .get("acl.toml")
2383 .cloned()
2384 .unwrap();
2385 let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2386 assert_eq!(decoded.owner_id, "owner-abc");
2387 assert_eq!(decoded.grants.len(), 2);
2388 assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2389 assert_eq!(decoded.grants[1].grantee_type, "Group");
2390 }
2391
2392 #[test]
2393 fn inventory_snapshot_roundtrip_via_store() {
2394 let tmp = TempDir::new().unwrap();
2395 let store = new_store(&tmp);
2396 store
2397 .put_bucket_meta(
2398 "b",
2399 &BucketMeta {
2400 name: "b".to_string(),
2401 ..Default::default()
2402 },
2403 )
2404 .unwrap();
2405 let mut configs = BTreeMap::new();
2406 configs.insert(
2407 "inv-1".to_string(),
2408 "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2409 );
2410 configs.insert(
2411 "inv-2".to_string(),
2412 "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2413 );
2414 let snap = InventorySnapshot {
2415 configs: configs.clone(),
2416 };
2417 let payload = toml::to_string(&snap).unwrap();
2418 store
2419 .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2420 .unwrap();
2421 let loaded = store.load().unwrap();
2422 let text = loaded
2423 .buckets
2424 .get("b")
2425 .unwrap()
2426 .subresources
2427 .get("inventory.toml")
2428 .cloned()
2429 .unwrap();
2430 let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2431 assert_eq!(decoded.configs, configs);
2432 }
2433
2434 #[test]
2435 fn legacy_versioning_file_is_read() {
2436 let tmp = TempDir::new().unwrap();
2437 let store = new_store(&tmp);
2438 store
2440 .put_bucket_meta(
2441 "b",
2442 &BucketMeta {
2443 name: "b".to_string(),
2444 ..Default::default()
2445 },
2446 )
2447 .unwrap();
2448 let path = store.bucket_dir("b").join("versioning.toml");
2450 std::fs::write(&path, "Enabled").unwrap();
2451 let loaded = store.load().unwrap();
2452 let snap = loaded.buckets.get("b").unwrap();
2453 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2454 }
2455
2456 #[test]
2457 fn load_skips_corrupt_bucket_and_keeps_others() {
2458 let tmp = TempDir::new().unwrap();
2461 let store = new_store(&tmp);
2462 store
2463 .put_bucket_meta(
2464 "good",
2465 &BucketMeta {
2466 name: "good".to_string(),
2467 ..Default::default()
2468 },
2469 )
2470 .unwrap();
2471 store
2472 .put_object(
2473 "good",
2474 "k",
2475 None,
2476 BodySource::Bytes(Bytes::from_static(b"hi")),
2477 &sample_meta("k", 2),
2478 )
2479 .unwrap();
2480
2481 let bad_dir = store.buckets_dir().join("bad");
2482 std::fs::create_dir_all(&bad_dir).unwrap();
2483 std::fs::write(bad_dir.join("meta.toml"), b"not valid toml = = =").unwrap();
2484
2485 let loaded = store.load().unwrap();
2486 assert!(
2487 loaded.buckets.contains_key("good"),
2488 "valid bucket must load"
2489 );
2490 assert!(
2491 !loaded.buckets.contains_key("bad"),
2492 "corrupt bucket must be skipped, not abort the load"
2493 );
2494 }
2495}