1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11 #[serde(default)]
12 pub account_id: String,
13 #[serde(default)]
14 pub region: String,
15 #[serde(default)]
16 pub buckets: BTreeMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21 pub meta: BucketMeta,
22 #[serde(default)]
23 pub objects: BTreeMap<String, LoadedObject>,
24 #[serde(default)]
25 pub object_versions: BTreeMap<String, Vec<LoadedObject>>,
26 #[serde(default)]
27 pub subresources: BTreeMap<String, String>,
28 #[serde(default)]
29 pub multipart_uploads: BTreeMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34 pub init: MpuInit,
35 #[serde(default)]
36 pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41 pub meta: UploadPartMeta,
42 #[serde(default)]
43 pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48 pub meta: ObjectMeta,
49 #[serde(default)]
50 pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55 #[serde(default)]
56 pub name: String,
57 #[serde(default = "default_time")]
58 pub creation_date: DateTime<Utc>,
59 #[serde(default)]
60 pub region: String,
61 #[serde(default)]
62 pub versioning: Option<String>,
63 #[serde(default)]
65 pub mfa_delete: Option<String>,
66 #[serde(default)]
67 pub acl: Option<String>,
68 #[serde(default)]
69 pub acl_owner_id: String,
70 #[serde(default)]
71 pub accelerate_status: Option<String>,
72 #[serde(default)]
73 pub eventbridge_enabled: bool,
74 #[serde(default)]
75 pub lifecycle_transition_default_min_size: Option<String>,
76}
77
78fn default_time() -> DateTime<Utc> {
79 DateTime::<Utc>::MIN_UTC
80}
81
82#[derive(Debug, Default, Clone, Serialize, Deserialize)]
83pub struct AclGrantSnapshot {
84 pub grantee_type: String,
85 #[serde(default)]
86 pub grantee_id: Option<String>,
87 #[serde(default)]
88 pub grantee_display_name: Option<String>,
89 #[serde(default)]
90 pub grantee_uri: Option<String>,
91 pub permission: String,
92}
93
94#[derive(Debug, Default, Clone, Serialize, Deserialize)]
95pub struct TagsSnapshot {
96 #[serde(default)]
97 pub tags: BTreeMap<String, String>,
98}
99
100#[derive(Debug, Default, Clone, Serialize, Deserialize)]
101pub struct AclSnapshot {
102 #[serde(default)]
103 pub owner_id: String,
104 #[serde(default)]
105 pub grants: Vec<AclGrantSnapshot>,
106}
107
108#[derive(Debug, Default, Clone, Serialize, Deserialize)]
109pub struct InventorySnapshot {
110 #[serde(default)]
111 pub configs: BTreeMap<String, String>,
112}
113
114#[derive(Debug, Default, Clone, Serialize, Deserialize)]
115pub struct ObjectMeta {
116 #[serde(default)]
117 pub key: String,
118 #[serde(default)]
119 pub content_type: String,
120 #[serde(default)]
121 pub etag: String,
122 #[serde(default)]
123 pub size: u64,
124 #[serde(default = "default_time")]
125 pub last_modified: DateTime<Utc>,
126 #[serde(default)]
127 pub metadata: BTreeMap<String, String>,
128 #[serde(default)]
129 pub tags: BTreeMap<String, String>,
130 #[serde(default)]
131 pub storage_class: String,
132 #[serde(default)]
133 pub acl_grants: Vec<AclGrantSnapshot>,
134 #[serde(default)]
135 pub acl_owner_id: Option<String>,
136 #[serde(default)]
137 pub parts_count: Option<u32>,
138 #[serde(default)]
139 pub part_sizes: Option<Vec<(u32, u64)>>,
140 #[serde(default)]
141 pub sse_algorithm: Option<String>,
142 #[serde(default)]
143 pub sse_kms_key_id: Option<String>,
144 #[serde(default)]
145 pub bucket_key_enabled: Option<bool>,
146 #[serde(default)]
147 pub version_id: Option<String>,
148 #[serde(default)]
149 pub is_delete_marker: bool,
150 #[serde(default)]
151 pub restore_ongoing: Option<bool>,
152 #[serde(default)]
153 pub restore_expiry: Option<String>,
154 #[serde(default)]
155 pub checksum_algorithm: Option<String>,
156 #[serde(default)]
157 pub checksum_value: Option<String>,
158 #[serde(default)]
159 pub lock_mode: Option<String>,
160 #[serde(default)]
161 pub lock_retain_until: Option<DateTime<Utc>>,
162 #[serde(default)]
163 pub lock_legal_hold: Option<String>,
164 #[serde(default)]
165 pub content_encoding: Option<String>,
166 #[serde(default)]
167 pub website_redirect_location: Option<String>,
168}
169
170#[derive(Debug, Default, Clone, Serialize, Deserialize)]
171pub struct MpuInit {
172 pub upload_id: String,
173 pub key: String,
174 #[serde(default = "default_time")]
175 pub initiated: DateTime<Utc>,
176 #[serde(default)]
177 pub metadata: BTreeMap<String, String>,
178 #[serde(default)]
179 pub content_type: String,
180 #[serde(default)]
181 pub storage_class: String,
182 #[serde(default)]
183 pub sse_algorithm: Option<String>,
184 #[serde(default)]
185 pub sse_kms_key_id: Option<String>,
186 #[serde(default)]
187 pub tagging: Option<String>,
188 #[serde(default)]
189 pub acl_grants: Vec<AclGrantSnapshot>,
190 #[serde(default)]
191 pub checksum_algorithm: Option<String>,
192}
193
194#[derive(Debug, Default, Clone, Serialize, Deserialize)]
195pub struct UploadPartMeta {
196 pub part_number: u32,
197 pub etag: String,
198 pub size: u64,
199 #[serde(default = "default_time")]
200 pub last_modified: DateTime<Utc>,
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub enum BucketSubresource {
205 Tags,
206 Lifecycle,
207 Cors,
208 Policy,
209 Notification,
210 Logging,
211 Website,
212 PublicAccessBlock,
213 ObjectLock,
214 Replication,
215 Ownership,
216 Inventory,
217 Encryption,
218 Versioning,
219 Acl,
220 Accelerate,
221 Analytics,
222 IntelligentTiering,
223 Metrics,
224 RequestPayment,
225 Abac,
226 MetadataConfiguration,
227 MetadataTableConfiguration,
228}
229
230pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
231 BucketSubresource::Tags,
232 BucketSubresource::Lifecycle,
233 BucketSubresource::Cors,
234 BucketSubresource::Policy,
235 BucketSubresource::Notification,
236 BucketSubresource::Logging,
237 BucketSubresource::Website,
238 BucketSubresource::PublicAccessBlock,
239 BucketSubresource::ObjectLock,
240 BucketSubresource::Replication,
241 BucketSubresource::Ownership,
242 BucketSubresource::Inventory,
243 BucketSubresource::Encryption,
244 BucketSubresource::Versioning,
245 BucketSubresource::Acl,
246 BucketSubresource::Accelerate,
247 BucketSubresource::Analytics,
248 BucketSubresource::IntelligentTiering,
249 BucketSubresource::Metrics,
250 BucketSubresource::RequestPayment,
251 BucketSubresource::Abac,
252 BucketSubresource::MetadataConfiguration,
253 BucketSubresource::MetadataTableConfiguration,
254];
255
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub enum BodyRef {
258 #[serde(skip)]
259 Memory(Bytes),
260 Disk {
261 bucket: String,
262 key: String,
263 #[serde(default)]
264 version: Option<String>,
265 path: PathBuf,
266 size: u64,
267 },
268}
269
270impl BodyRef {
271 pub fn size(&self) -> u64 {
272 match self {
273 BodyRef::Memory(b) => b.len() as u64,
274 BodyRef::Disk { size, .. } => *size,
275 }
276 }
277}
278
279impl Default for BodyRef {
280 fn default() -> Self {
281 BodyRef::Memory(Bytes::new())
282 }
283}
284
285#[derive(Debug)]
286pub enum BodySource {
287 Bytes(Bytes),
288 File(PathBuf),
291 FileCopy(PathBuf),
295}
296
297fn read_body_source(body: BodySource) -> StoreResult<Bytes> {
303 match body {
304 BodySource::Bytes(b) => Ok(b),
305 BodySource::File(p) => {
306 let bytes = std::fs::read(&p)?;
307 let _ = std::fs::remove_file(&p);
310 Ok(Bytes::from(bytes))
311 }
312 BodySource::FileCopy(p) => {
313 let bytes = std::fs::read(&p)?;
314 Ok(Bytes::from(bytes))
315 }
316 }
317}
318
319#[derive(Debug, Error)]
320pub enum StoreError {
321 #[error("io error: {0}")]
322 Io(#[from] std::io::Error),
323 #[error("serialization error: {0}")]
324 Serde(String),
325 #[error("not supported by this store")]
326 NotSupported,
327 #[error("{0}")]
328 Other(String),
329}
330
331pub type StoreResult<T> = Result<T, StoreError>;
332
333pub trait S3Store: Send + Sync {
334 fn load(&self) -> StoreResult<S3State>;
335
336 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
337 fn put_bucket_subresource(
338 &self,
339 bucket: &str,
340 kind: BucketSubresource,
341 payload: &str,
342 ) -> StoreResult<()>;
343 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
344 fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
345
346 fn put_object(
347 &self,
348 bucket: &str,
349 key: &str,
350 version: Option<&str>,
351 body: BodySource,
352 meta: &ObjectMeta,
353 ) -> StoreResult<BodyRef>;
354 fn put_object_meta(
355 &self,
356 bucket: &str,
357 key: &str,
358 version: Option<&str>,
359 meta: &ObjectMeta,
360 ) -> StoreResult<()>;
361 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
362 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
363
364 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
365 fn mpu_put_part(
371 &self,
372 bucket: &str,
373 upload_id: &str,
374 part_number: u32,
375 body: BodySource,
376 etag: &str,
377 ) -> StoreResult<BodyRef>;
378 fn spool_dir(&self) -> Option<std::path::PathBuf> {
385 None
386 }
387 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
388 fn mpu_complete(
389 &self,
390 bucket: &str,
391 upload_id: &str,
392 final_key: &str,
393 version: Option<&str>,
394 meta: &ObjectMeta,
395 ) -> StoreResult<BodyRef>;
396}
397
398pub struct MemoryS3Store;
399
400impl MemoryS3Store {
401 pub fn new() -> Self {
402 Self
403 }
404}
405
406impl Default for MemoryS3Store {
407 fn default() -> Self {
408 Self::new()
409 }
410}
411
412impl S3Store for MemoryS3Store {
413 fn load(&self) -> StoreResult<S3State> {
414 Ok(S3State::default())
415 }
416
417 fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
418 Ok(())
419 }
420 fn put_bucket_subresource(
421 &self,
422 _bucket: &str,
423 _kind: BucketSubresource,
424 _payload: &str,
425 ) -> StoreResult<()> {
426 Ok(())
427 }
428 fn delete_bucket_subresource(
429 &self,
430 _bucket: &str,
431 _kind: BucketSubresource,
432 ) -> StoreResult<()> {
433 Ok(())
434 }
435 fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
436 Ok(())
437 }
438
439 fn put_object(
440 &self,
441 _bucket: &str,
442 _key: &str,
443 _version: Option<&str>,
444 body: BodySource,
445 _meta: &ObjectMeta,
446 ) -> StoreResult<BodyRef> {
447 Ok(BodyRef::Memory(read_body_source(body)?))
448 }
449 fn put_object_meta(
450 &self,
451 _bucket: &str,
452 _key: &str,
453 _version: Option<&str>,
454 _meta: &ObjectMeta,
455 ) -> StoreResult<()> {
456 Ok(())
457 }
458 fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
459 Ok(())
460 }
461 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
462 match body {
463 BodyRef::Memory(b) => Ok(b.clone()),
464 BodyRef::Disk { .. } => {
465 panic!("MemoryS3Store cannot open Disk-backed BodyRef")
466 }
467 }
468 }
469
470 fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
471 Ok(())
472 }
473 fn mpu_put_part(
474 &self,
475 _bucket: &str,
476 _upload_id: &str,
477 _part_number: u32,
478 body: BodySource,
479 _etag: &str,
480 ) -> StoreResult<BodyRef> {
481 Ok(BodyRef::Memory(read_body_source(body)?))
484 }
485 fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
486 Ok(())
487 }
488 fn mpu_complete(
489 &self,
490 _bucket: &str,
491 _upload_id: &str,
492 _final_key: &str,
493 _version: Option<&str>,
494 _meta: &ObjectMeta,
495 ) -> StoreResult<BodyRef> {
496 Ok(BodyRef::Memory(Bytes::new()))
497 }
498}
499
500pub struct DiskS3Store {
501 root: PathBuf,
502 cache: std::sync::Arc<crate::cache::BodyCache>,
503}
504
505impl DiskS3Store {
506 pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
507 Self { root, cache }
508 }
509
510 fn buckets_dir(&self) -> PathBuf {
511 self.root.join("buckets")
512 }
513
514 fn bucket_dir(&self, bucket: &str) -> PathBuf {
515 self.buckets_dir()
516 .join(crate::key_escape::escape_key_segment(bucket))
517 }
518
519 fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
520 self.bucket_dir(bucket)
521 .join("objects")
522 .join(crate::key_escape::escape_key_segment(key))
523 }
524
525 fn version_tag(version: Option<&str>) -> String {
526 version.unwrap_or("null").to_string()
527 }
528
529 fn object_paths(
530 &self,
531 bucket: &str,
532 key: &str,
533 version: Option<&str>,
534 ) -> (PathBuf, PathBuf, PathBuf) {
535 let dir = self.object_dir(bucket, key);
536 let tag = Self::version_tag(version);
537 let bin = dir.join(format!("{}.bin", tag));
538 let toml = dir.join(format!("{}.toml", tag));
539 (dir, bin, toml)
540 }
541
542 fn subresource_filename(kind: BucketSubresource) -> &'static str {
543 match kind {
544 BucketSubresource::Tags => "tags.toml",
545 BucketSubresource::Lifecycle => "lifecycle.toml",
546 BucketSubresource::Cors => "cors.toml",
547 BucketSubresource::Policy => "policy.toml",
548 BucketSubresource::Notification => "notification.toml",
549 BucketSubresource::Logging => "logging.toml",
550 BucketSubresource::Website => "website.toml",
551 BucketSubresource::PublicAccessBlock => "public_access_block.toml",
552 BucketSubresource::ObjectLock => "object_lock.toml",
553 BucketSubresource::Replication => "replication.toml",
554 BucketSubresource::Ownership => "ownership.toml",
555 BucketSubresource::Inventory => "inventory.toml",
556 BucketSubresource::Encryption => "encryption.toml",
557 BucketSubresource::Versioning => "versioning.toml",
558 BucketSubresource::Acl => "acl.toml",
559 BucketSubresource::Accelerate => "accelerate.toml",
560 BucketSubresource::Analytics => "analytics.toml",
561 BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
562 BucketSubresource::Metrics => "metrics.toml",
563 BucketSubresource::RequestPayment => "request_payment.toml",
564 BucketSubresource::Abac => "abac.toml",
565 BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
566 BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
567 }
568 }
569
570 fn cleanup_empty(dir: &std::path::Path) {
571 let _ = std::fs::remove_dir(dir);
572 }
573
574 fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
575 self.bucket_dir(bucket)
576 .join("mpu")
577 .join(crate::key_escape::escape_key_segment(upload_id))
578 }
579
580 fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
581 self.mpu_dir(bucket, upload_id).join("parts")
582 }
583
584 fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
585 self.mpu_parts_dir(bucket, upload_id)
586 .join(format!("{}.bin", part_number))
587 }
588
589 fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
590 self.mpu_parts_dir(bucket, upload_id)
591 .join(format!("{}.toml", part_number))
592 }
593
594 fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
595 crate::cache::BodyKey::new(
596 bucket.to_string(),
597 format!("__mpu__/{}", upload_id),
598 Some(format!("part-{}", part_number)),
599 )
600 }
601}
602
603fn io_other(msg: impl Into<String>) -> StoreError {
604 StoreError::Other(msg.into())
605}
606
607impl S3Store for DiskS3Store {
608 fn load(&self) -> StoreResult<S3State> {
609 let mut state = S3State::default();
610 let buckets_dir = self.buckets_dir();
611 if !buckets_dir.exists() {
612 return Ok(state);
613 }
614 for entry in std::fs::read_dir(&buckets_dir)? {
615 let entry = entry?;
616 if !entry.file_type()?.is_dir() {
617 continue;
618 }
619 let bdir = entry.path();
620 let loaded: StoreResult<Option<BucketSnapshot>> = (|| {
625 let meta_path = bdir.join("meta.toml");
626 if !meta_path.exists() {
627 return Ok(None);
628 }
629 let meta_text = std::fs::read_to_string(&meta_path)?;
630 let mut meta: BucketMeta =
631 toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
632 let mut snap = BucketSnapshot {
633 meta: meta.clone(),
634 objects: BTreeMap::new(),
635 object_versions: BTreeMap::new(),
636 subresources: BTreeMap::new(),
637 multipart_uploads: BTreeMap::new(),
638 };
639
640 for kind in ALL_SUBRESOURCES {
641 let fname = Self::subresource_filename(*kind);
642 let path = bdir.join(fname);
643 if path.exists() {
644 let text = std::fs::read_to_string(&path)?;
645 if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none()
646 {
647 let stripped = text.trim();
648 if !stripped.is_empty() {
649 snap.meta.versioning = Some(stripped.to_string());
650 meta.versioning = snap.meta.versioning.clone();
651 }
652 }
653 snap.subresources.insert(fname.to_string(), text);
654 }
655 }
656
657 let objects_root = bdir.join("objects");
658 if objects_root.exists() {
659 for okey_entry in std::fs::read_dir(&objects_root)? {
660 let okey_entry = okey_entry?;
661 if !okey_entry.file_type()?.is_dir() {
662 continue;
663 }
664 let key_dir = okey_entry.path();
665 let mut versioned: Vec<LoadedObject> = Vec::new();
666 let mut key_name: Option<String> = None;
667 for version_entry in std::fs::read_dir(&key_dir)? {
668 let version_entry = version_entry?;
669 let path = version_entry.path();
670 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
671 continue;
672 };
673 if !fname.ends_with(".toml") {
674 continue;
675 }
676 let version_tag = &fname[..fname.len() - 5];
677 let toml_text = std::fs::read_to_string(&path)?;
678 let obj_meta: ObjectMeta = toml::from_str(&toml_text)
679 .map_err(|e| StoreError::Serde(e.to_string()))?;
680 let bin_path = key_dir.join(format!("{}.bin", version_tag));
681 let (body, size) = if obj_meta.is_delete_marker {
682 (BodyRef::Memory(Bytes::new()), 0u64)
683 } else if bin_path.exists() {
684 let sz = std::fs::metadata(&bin_path)?.len();
685 (
686 BodyRef::Disk {
687 bucket: meta.name.clone(),
688 key: obj_meta.key.clone(),
689 version: if version_tag == "null" {
690 None
691 } else {
692 Some(version_tag.to_string())
693 },
694 path: bin_path,
695 size: sz,
696 },
697 sz,
698 )
699 } else {
700 return Err(StoreError::Other(format!(
705 "missing body file: {}",
706 bin_path.display()
707 )));
708 };
709 let _ = size;
710 key_name.get_or_insert_with(|| obj_meta.key.clone());
711 if version_tag == "null" && obj_meta.version_id.is_none() {
712 snap.objects.insert(
713 obj_meta.key.clone(),
714 LoadedObject {
715 meta: obj_meta,
716 body,
717 },
718 );
719 } else {
720 versioned.push(LoadedObject {
721 meta: obj_meta,
722 body,
723 });
724 }
725 }
726 if !versioned.is_empty() {
727 versioned.sort_by_key(|v| v.meta.last_modified);
728 if let Some(key) = key_name {
729 match versioned.last() {
736 Some(newest) if newest.meta.is_delete_marker => {
737 snap.objects.remove(&key);
738 }
739 Some(newest) => {
740 snap.objects.insert(key.clone(), newest.clone());
741 }
742 None => {}
743 }
744 snap.object_versions.insert(key, versioned);
745 }
746 }
747 }
748 }
749
750 let mpu_root = bdir.join("mpu");
751 if mpu_root.exists() {
752 for upload_entry in std::fs::read_dir(&mpu_root)? {
753 let upload_entry = upload_entry?;
754 if !upload_entry.file_type()?.is_dir() {
755 continue;
756 }
757 let upload_dir = upload_entry.path();
758 let init_path = upload_dir.join("init.toml");
759 if !init_path.exists() {
760 continue;
761 }
762 let init_text = std::fs::read_to_string(&init_path)?;
763 let init: MpuInit = toml::from_str(&init_text)
764 .map_err(|e| StoreError::Serde(e.to_string()))?;
765 let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
766 let parts_dir = upload_dir.join("parts");
767 if parts_dir.exists() {
768 for part_entry in std::fs::read_dir(&parts_dir)? {
769 let part_entry = part_entry?;
770 let path = part_entry.path();
771 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
772 continue;
773 };
774 if !fname.ends_with(".toml") {
775 continue;
776 }
777 let stem = &fname[..fname.len() - 5];
778 let Ok(part_number) = stem.parse::<u32>() else {
779 continue;
780 };
781 let toml_text = std::fs::read_to_string(&path)?;
782 let part_meta: UploadPartMeta = toml::from_str(&toml_text)
783 .map_err(|e| StoreError::Serde(e.to_string()))?;
784 let bin_path = parts_dir.join(format!("{}.bin", part_number));
785 if !bin_path.exists() {
786 return Err(StoreError::Other(format!(
787 "missing multipart part body file: {}",
788 bin_path.display()
789 )));
790 }
791 let sz = std::fs::metadata(&bin_path)?.len();
792 let body = BodyRef::Disk {
793 bucket: meta.name.clone(),
794 key: format!("__mpu__/{}", init.upload_id),
795 version: Some(format!("part-{}", part_number)),
796 path: bin_path,
797 size: sz,
798 };
799 loaded_parts.insert(
800 part_number,
801 LoadedPart {
802 meta: part_meta,
803 body,
804 },
805 );
806 }
807 }
808 snap.multipart_uploads.insert(
809 init.upload_id.clone(),
810 LoadedMpu {
811 init,
812 parts: loaded_parts,
813 },
814 );
815 }
816 }
817
818 Ok(Some(snap))
819 })();
820 match loaded {
821 Ok(Some(snap)) => {
822 state.buckets.insert(snap.meta.name.clone(), snap);
823 }
824 Ok(None) => {}
825 Err(e) => tracing::warn!(
826 bucket = %bdir.display(),
827 error = %e,
828 "skipping unreadable S3 bucket during load"
829 ),
830 }
831 }
832 Ok(state)
833 }
834
835 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
836 let dir = self.bucket_dir(bucket);
837 std::fs::create_dir_all(&dir)?;
838 crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
839 Ok(())
840 }
841
842 fn put_bucket_subresource(
843 &self,
844 bucket: &str,
845 kind: BucketSubresource,
846 payload: &str,
847 ) -> StoreResult<()> {
848 let dir = self.bucket_dir(bucket);
849 std::fs::create_dir_all(&dir)?;
850 let path = dir.join(Self::subresource_filename(kind));
851 crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
852 Ok(())
853 }
854
855 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
856 let path = self
857 .bucket_dir(bucket)
858 .join(Self::subresource_filename(kind));
859 match std::fs::remove_file(&path) {
860 Ok(_) => Ok(()),
861 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
862 Err(e) => Err(e.into()),
863 }
864 }
865
866 fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
867 let dir = self.bucket_dir(bucket);
868 match std::fs::remove_dir_all(&dir) {
869 Ok(_) => Ok(()),
870 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
871 Err(e) => Err(e.into()),
872 }
873 }
874
875 fn put_object(
876 &self,
877 bucket: &str,
878 key: &str,
879 version: Option<&str>,
880 body: BodySource,
881 meta: &ObjectMeta,
882 ) -> StoreResult<BodyRef> {
883 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
884 std::fs::create_dir_all(&dir)?;
885
886 if meta.is_delete_marker {
887 crate::atomic::write_atomic_toml(&toml_path, meta)?;
888 return Ok(BodyRef::Memory(Bytes::new()));
889 }
890
891 let size: u64;
892 let bytes_for_cache: Option<Bytes>;
893 match body {
894 BodySource::Bytes(b) => {
895 size = b.len() as u64;
896 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
897 bytes_for_cache = Some(b);
898 }
899 BodySource::File(src) => {
900 let src_size = std::fs::metadata(&src)?.len();
901 size = src_size;
902 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
903 bytes_for_cache = None;
904 }
905 BodySource::FileCopy(src) => {
906 let src_size = std::fs::metadata(&src)?.len();
907 size = src_size;
908 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
909 bytes_for_cache = None;
910 }
911 }
912
913 crate::atomic::write_atomic_toml(&toml_path, meta)?;
914
915 let body_key = crate::cache::BodyKey::new(
916 bucket.to_string(),
917 key.to_string(),
918 version.map(|s| s.to_string()),
919 );
920 if let Some(b) = bytes_for_cache {
921 self.cache.insert(body_key, b);
922 } else {
923 self.cache.invalidate(&crate::cache::BodyKey::new(
924 bucket.to_string(),
925 key.to_string(),
926 version.map(|s| s.to_string()),
927 ));
928 }
929
930 Ok(BodyRef::Disk {
931 bucket: bucket.to_string(),
932 key: key.to_string(),
933 version: version.map(|s| s.to_string()),
934 path: bin_path,
935 size,
936 })
937 }
938
939 fn put_object_meta(
940 &self,
941 bucket: &str,
942 key: &str,
943 version: Option<&str>,
944 meta: &ObjectMeta,
945 ) -> StoreResult<()> {
946 let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
947 std::fs::create_dir_all(&dir)?;
948 crate::atomic::write_atomic_toml(&toml_path, meta)?;
949 Ok(())
950 }
951
952 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
953 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
954 for p in [&bin_path, &toml_path] {
955 match std::fs::remove_file(p) {
956 Ok(_) => {}
957 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
958 Err(e) => return Err(e.into()),
959 }
960 }
961 Self::cleanup_empty(&dir);
962
963 self.cache.invalidate(&crate::cache::BodyKey::new(
964 bucket.to_string(),
965 key.to_string(),
966 version.map(|s| s.to_string()),
967 ));
968 Ok(())
969 }
970
971 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
972 match body {
973 BodyRef::Memory(b) => Ok(b.clone()),
974 BodyRef::Disk {
975 bucket,
976 key,
977 version,
978 path,
979 size: _,
980 } => {
981 let body_key =
982 crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
983 if let Some(bytes) = self.cache.get(&body_key) {
984 return Ok(bytes);
985 }
986 let bytes = Bytes::from(std::fs::read(path)?);
987 self.cache.insert(body_key, bytes.clone());
988 Ok(bytes)
989 }
990 }
991 }
992
993 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
994 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
995 std::fs::create_dir_all(&parts_dir)?;
996 let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
997 crate::atomic::write_atomic_toml(&init_path, init)?;
998 Ok(())
999 }
1000
1001 fn mpu_put_part(
1002 &self,
1003 bucket: &str,
1004 upload_id: &str,
1005 part_number: u32,
1006 body: BodySource,
1007 etag: &str,
1008 ) -> StoreResult<BodyRef> {
1009 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1010 std::fs::create_dir_all(&parts_dir)?;
1011 let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
1012 let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
1013
1014 let size: u64 = match body {
1015 BodySource::Bytes(b) => {
1016 let n = b.len() as u64;
1017 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
1018 let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
1019 self.cache.insert(cache_key, b);
1020 n
1021 }
1022 BodySource::File(src) => {
1023 let n = std::fs::metadata(&src)?.len();
1024 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
1025 self.cache
1026 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1027 n
1028 }
1029 BodySource::FileCopy(src) => {
1030 let n = std::fs::metadata(&src)?.len();
1031 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
1032 self.cache
1033 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1034 n
1035 }
1036 };
1037
1038 let meta = UploadPartMeta {
1039 part_number,
1040 etag: etag.to_string(),
1041 size,
1042 last_modified: Utc::now(),
1043 };
1044 crate::atomic::write_atomic_toml(&toml_path, &meta)?;
1045 Ok(BodyRef::Disk {
1048 bucket: bucket.to_string(),
1049 key: format!("__mpu__/{upload_id}/part-{part_number:05}"),
1050 version: None,
1051 path: bin_path,
1052 size,
1053 })
1054 }
1055
1056 fn spool_dir(&self) -> Option<std::path::PathBuf> {
1057 let dir = self.root.join(".spool");
1058 let _ = std::fs::create_dir_all(&dir);
1062 Some(dir)
1063 }
1064
1065 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
1066 let dir = self.mpu_dir(bucket, upload_id);
1067 if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
1069 for entry in entries.flatten() {
1070 let path = entry.path();
1071 if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
1072 if let Some(stem) = fname.strip_suffix(".bin") {
1073 if let Ok(n) = stem.parse::<u32>() {
1074 self.cache
1075 .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1076 }
1077 }
1078 }
1079 }
1080 }
1081 match std::fs::remove_dir_all(&dir) {
1082 Ok(_) => Ok(()),
1083 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1084 Err(e) => Err(e.into()),
1085 }
1086 }
1087
1088 fn mpu_complete(
1089 &self,
1090 bucket: &str,
1091 upload_id: &str,
1092 final_key: &str,
1093 version: Option<&str>,
1094 meta: &ObjectMeta,
1095 ) -> StoreResult<BodyRef> {
1096 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1097 if !parts_dir.exists() {
1098 return Err(io_other(format!(
1099 "mpu_complete: no parts dir for upload {}",
1100 upload_id
1101 )));
1102 }
1103
1104 let mut part_numbers: Vec<u32> = Vec::new();
1106 for entry in std::fs::read_dir(&parts_dir)? {
1107 let entry = entry?;
1108 let path = entry.path();
1109 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1110 continue;
1111 };
1112 if let Some(stem) = fname.strip_suffix(".toml") {
1113 if let Ok(n) = stem.parse::<u32>() {
1114 if self.mpu_part_bin(bucket, upload_id, n).exists() {
1115 part_numbers.push(n);
1116 }
1117 }
1118 }
1119 }
1120 part_numbers.sort_unstable();
1121 if part_numbers.is_empty() {
1122 return Err(io_other(format!(
1123 "mpu_complete: upload {} has no parts",
1124 upload_id
1125 )));
1126 }
1127
1128 let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1129 std::fs::create_dir_all(&dir)?;
1130
1131 let total_size: u64 = if part_numbers.len() == 1 {
1132 let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1133 let sz = std::fs::metadata(&only)?.len();
1134 match std::fs::rename(&only, &bin_path) {
1135 Ok(_) => {}
1136 Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1137 {
1139 let mut input = std::fs::File::open(&only)?;
1140 let tmp = {
1141 let mut os = bin_path.as_os_str().to_owned();
1142 os.push(".tmp");
1143 PathBuf::from(os)
1144 };
1145 let mut out = std::fs::OpenOptions::new()
1146 .write(true)
1147 .create(true)
1148 .truncate(true)
1149 .open(&tmp)?;
1150 std::io::copy(&mut input, &mut out)?;
1151 out.sync_all()?;
1152 std::fs::rename(&tmp, &bin_path)?;
1153 }
1154 let _ = std::fs::remove_file(&only);
1155 }
1156 Err(e) => return Err(e.into()),
1157 }
1158 if let Some(parent) = bin_path.parent() {
1163 if let Ok(dir_handle) = std::fs::File::open(parent) {
1164 let _ = dir_handle.sync_all();
1165 }
1166 }
1167 sz
1168 } else {
1169 let tmp = {
1170 let mut os = bin_path.as_os_str().to_owned();
1171 os.push(".tmp");
1172 PathBuf::from(os)
1173 };
1174 let mut total: u64 = 0;
1175 {
1176 let mut out = std::fs::OpenOptions::new()
1177 .write(true)
1178 .create(true)
1179 .truncate(true)
1180 .open(&tmp)?;
1181 for n in &part_numbers {
1182 let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1183 let mut input = std::fs::File::open(&part_path)?;
1184 let copied = std::io::copy(&mut input, &mut out)?;
1185 total += copied;
1186 }
1187 out.sync_all()?;
1188 }
1189 std::fs::rename(&tmp, &bin_path)?;
1190 if let Some(parent) = bin_path.parent() {
1191 if let Ok(dir_handle) = std::fs::File::open(parent) {
1192 let _ = dir_handle.sync_all();
1193 }
1194 }
1195 total
1196 };
1197
1198 crate::atomic::write_atomic_toml(&toml_path, meta)?;
1199
1200 for n in &part_numbers {
1202 self.cache
1203 .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1204 }
1205 let mpu_dir = self.mpu_dir(bucket, upload_id);
1206 let _ = std::fs::remove_dir_all(&mpu_dir);
1207
1208 self.cache.invalidate(&crate::cache::BodyKey::new(
1213 bucket.to_string(),
1214 final_key.to_string(),
1215 version.map(|s| s.to_string()),
1216 ));
1217
1218 Ok(BodyRef::Disk {
1219 bucket: bucket.to_string(),
1220 key: final_key.to_string(),
1221 version: version.map(|s| s.to_string()),
1222 path: bin_path,
1223 size: total_size,
1224 })
1225 }
1226}
1227
1228fn libc_exdev() -> i32 {
1229 18
1230}
1231
1232#[cfg(test)]
1233mod disk_tests {
1234 use super::*;
1235 use std::sync::Arc;
1236 use tempfile::TempDir;
1237
1238 fn new_store(tmp: &TempDir) -> DiskS3Store {
1239 let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1240 DiskS3Store::new(tmp.path().to_path_buf(), cache)
1241 }
1242
1243 fn new_store_with_cache(
1244 tmp: &TempDir,
1245 cap: u64,
1246 ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1247 let cache = Arc::new(crate::cache::BodyCache::new(cap));
1248 (
1249 DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1250 cache,
1251 )
1252 }
1253
1254 fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1255 ObjectMeta {
1256 key: key.to_string(),
1257 content_type: "application/octet-stream".to_string(),
1258 etag: "etag".to_string(),
1259 size,
1260 ..Default::default()
1261 }
1262 }
1263
1264 #[test]
1265 fn put_bucket_meta_roundtrip() {
1266 let tmp = TempDir::new().unwrap();
1267 let store = new_store(&tmp);
1268 let meta = BucketMeta {
1269 name: "b1".to_string(),
1270 region: "us-east-1".to_string(),
1271 versioning: Some("Enabled".to_string()),
1272 ..Default::default()
1273 };
1274 store.put_bucket_meta("b1", &meta).unwrap();
1275 let loaded = store.load().unwrap();
1276 let snap = loaded.buckets.get("b1").unwrap();
1277 assert_eq!(snap.meta.name, "b1");
1278 assert_eq!(snap.meta.region, "us-east-1");
1279 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1280 }
1281
1282 #[test]
1283 fn put_bucket_subresource_each_variant_writes_file() {
1284 let tmp = TempDir::new().unwrap();
1285 let store = new_store(&tmp);
1286 store
1287 .put_bucket_meta(
1288 "b",
1289 &BucketMeta {
1290 name: "b".to_string(),
1291 ..Default::default()
1292 },
1293 )
1294 .unwrap();
1295 let variants = [
1296 BucketSubresource::Tags,
1297 BucketSubresource::Lifecycle,
1298 BucketSubresource::Cors,
1299 BucketSubresource::Policy,
1300 BucketSubresource::Notification,
1301 BucketSubresource::Logging,
1302 BucketSubresource::Website,
1303 BucketSubresource::PublicAccessBlock,
1304 BucketSubresource::ObjectLock,
1305 BucketSubresource::Replication,
1306 BucketSubresource::Ownership,
1307 BucketSubresource::Inventory,
1308 BucketSubresource::Encryption,
1309 BucketSubresource::Versioning,
1310 BucketSubresource::Acl,
1311 BucketSubresource::Accelerate,
1312 ];
1313 for v in variants {
1314 store
1315 .put_bucket_subresource("b", v, "payload=true")
1316 .unwrap();
1317 let file = store
1318 .bucket_dir("b")
1319 .join(DiskS3Store::subresource_filename(v));
1320 assert!(file.exists(), "{:?}", v);
1321 assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1322 }
1323 }
1324
1325 #[test]
1326 fn delete_bucket_subresource_removes_file() {
1327 let tmp = TempDir::new().unwrap();
1328 let store = new_store(&tmp);
1329 store
1330 .put_bucket_meta(
1331 "b",
1332 &BucketMeta {
1333 name: "b".to_string(),
1334 ..Default::default()
1335 },
1336 )
1337 .unwrap();
1338 store
1339 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1340 .unwrap();
1341 store
1342 .delete_bucket_subresource("b", BucketSubresource::Tags)
1343 .unwrap();
1344 let file = store.bucket_dir("b").join("tags.toml");
1345 assert!(!file.exists());
1346 store
1348 .delete_bucket_subresource("b", BucketSubresource::Tags)
1349 .unwrap();
1350 }
1351
1352 #[test]
1353 fn put_object_bytes_roundtrip() {
1354 let tmp = TempDir::new().unwrap();
1355 let store = new_store(&tmp);
1356 store
1357 .put_bucket_meta(
1358 "b",
1359 &BucketMeta {
1360 name: "b".to_string(),
1361 ..Default::default()
1362 },
1363 )
1364 .unwrap();
1365 let data = Bytes::from_static(b"hello world");
1366 let meta = sample_meta("k1", data.len() as u64);
1367 let body_ref = store
1368 .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1369 .unwrap();
1370 match &body_ref {
1371 BodyRef::Disk {
1372 bucket,
1373 key,
1374 size,
1375 path,
1376 ..
1377 } => {
1378 assert_eq!(bucket, "b");
1379 assert_eq!(key, "k1");
1380 assert_eq!(*size, data.len() as u64);
1381 assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1382 }
1383 _ => panic!("expected Disk"),
1384 }
1385 let loaded = store.load().unwrap();
1386 let snap = loaded.buckets.get("b").unwrap();
1387 let obj = snap.objects.get("k1").unwrap();
1388 assert_eq!(obj.meta.size, data.len() as u64);
1389 }
1390
1391 #[test]
1392 fn put_object_file_copy_source_leaves_src_in_place() {
1393 let tmp = TempDir::new().unwrap();
1394 let store = new_store(&tmp);
1395 store
1396 .put_bucket_meta(
1397 "b",
1398 &BucketMeta {
1399 name: "b".to_string(),
1400 ..Default::default()
1401 },
1402 )
1403 .unwrap();
1404 let src_dir = TempDir::new().unwrap();
1405 let src = src_dir.path().join("src.bin");
1406 std::fs::write(&src, b"copied-body").unwrap();
1407 let meta = sample_meta("k", 11);
1408 let bref = store
1409 .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1410 .unwrap();
1411 match bref {
1412 BodyRef::Disk { path, size, .. } => {
1413 assert_eq!(size, 11);
1414 assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1415 }
1416 _ => panic!("expected Disk bodyref"),
1417 }
1418 assert!(src.exists(), "source file must not be moved by FileCopy");
1419 assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1420 }
1421
1422 #[test]
1423 fn put_object_file_source() {
1424 let tmp = TempDir::new().unwrap();
1425 let store = new_store(&tmp);
1426 store
1427 .put_bucket_meta(
1428 "b",
1429 &BucketMeta {
1430 name: "b".to_string(),
1431 ..Default::default()
1432 },
1433 )
1434 .unwrap();
1435 let src = tmp.path().join("src.bin");
1436 std::fs::write(&src, b"file-body").unwrap();
1437 let meta = sample_meta("k", 9);
1438 let body_ref = store
1439 .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1440 .unwrap();
1441 let path = match body_ref {
1442 BodyRef::Disk { path, .. } => path,
1443 _ => panic!(),
1444 };
1445 assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1446 }
1447
1448 #[test]
1449 fn put_object_meta_only_keeps_bin() {
1450 let tmp = TempDir::new().unwrap();
1451 let store = new_store(&tmp);
1452 store
1453 .put_bucket_meta(
1454 "b",
1455 &BucketMeta {
1456 name: "b".to_string(),
1457 ..Default::default()
1458 },
1459 )
1460 .unwrap();
1461 let data = Bytes::from_static(b"abc");
1462 let mut meta = sample_meta("k", 3);
1463 store
1464 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1465 .unwrap();
1466 let (_, bin, _) = store.object_paths("b", "k", None);
1467 let before = std::fs::read(&bin).unwrap();
1468 meta.tags.insert("x".to_string(), "y".to_string());
1469 store.put_object_meta("b", "k", None, &meta).unwrap();
1470 assert_eq!(std::fs::read(&bin).unwrap(), before);
1471 let loaded = store.load().unwrap();
1472 let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1473 assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1474 }
1475
1476 #[test]
1477 fn delete_object_cleans_up_files_and_cache() {
1478 let tmp = TempDir::new().unwrap();
1479 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1480 store
1481 .put_bucket_meta(
1482 "b",
1483 &BucketMeta {
1484 name: "b".to_string(),
1485 ..Default::default()
1486 },
1487 )
1488 .unwrap();
1489 let data = Bytes::from_static(b"bye");
1490 let meta = sample_meta("k", 3);
1491 store
1492 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1493 .unwrap();
1494 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1495 assert!(cache.get(&body_key).is_some());
1496 store.delete_object("b", "k", None).unwrap();
1497 let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1498 assert!(!bin.exists());
1499 assert!(!toml_path.exists());
1500 assert!(!dir.exists());
1501 assert!(cache.get(&body_key).is_none());
1502 }
1503
1504 #[test]
1505 fn open_object_body_cache_hit_and_refill() {
1506 let tmp = TempDir::new().unwrap();
1507 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1508 store
1509 .put_bucket_meta(
1510 "b",
1511 &BucketMeta {
1512 name: "b".to_string(),
1513 ..Default::default()
1514 },
1515 )
1516 .unwrap();
1517 let data = Bytes::from_static(b"payload");
1518 let meta = sample_meta("k", data.len() as u64);
1519 let body_ref = store
1520 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1521 .unwrap();
1522 let got = store.open_object_body(&body_ref).unwrap();
1524 assert_eq!(got, data);
1525 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1527 cache.invalidate(&body_key);
1528 assert!(cache.get(&body_key).is_none());
1529 let got = store.open_object_body(&body_ref).unwrap();
1530 assert_eq!(got, data);
1531 assert!(cache.get(&body_key).is_some());
1532 }
1533
1534 #[test]
1535 fn open_object_body_large_bypasses_cache() {
1536 let tmp = TempDir::new().unwrap();
1537 let (store, cache) = new_store_with_cache(&tmp, 1024);
1539 store
1540 .put_bucket_meta(
1541 "b",
1542 &BucketMeta {
1543 name: "b".to_string(),
1544 ..Default::default()
1545 },
1546 )
1547 .unwrap();
1548 let data = Bytes::from(vec![7u8; 800]);
1549 let meta = sample_meta("big", 800);
1550 let body_ref = store
1551 .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1552 .unwrap();
1553 let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1554 assert!(cache.get(&body_key).is_none());
1555 let got = store.open_object_body(&body_ref).unwrap();
1556 assert_eq!(got, data);
1557 assert!(cache.get(&body_key).is_none());
1559 }
1560
1561 #[test]
1562 fn load_empty_dir() {
1563 let tmp = TempDir::new().unwrap();
1564 let store = new_store(&tmp);
1565 let s = store.load().unwrap();
1566 assert!(s.buckets.is_empty());
1567 }
1568
1569 #[test]
1570 fn load_skips_mpu_without_init() {
1571 let tmp = TempDir::new().unwrap();
1572 let store = new_store(&tmp);
1573 store
1574 .put_bucket_meta(
1575 "b",
1576 &BucketMeta {
1577 name: "b".to_string(),
1578 ..Default::default()
1579 },
1580 )
1581 .unwrap();
1582 let data = Bytes::from_static(b"x");
1583 let meta = sample_meta("k", 1);
1584 store
1585 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1586 .unwrap();
1587 let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1589 std::fs::create_dir_all(&mpu).unwrap();
1590
1591 let loaded = store.load().unwrap();
1592 let snap = loaded.buckets.get("b").unwrap();
1593 assert_eq!(snap.objects.len(), 1);
1594 assert!(snap.multipart_uploads.is_empty());
1595 }
1596
1597 #[test]
1598 fn load_reads_bucket_subresources() {
1599 let tmp = TempDir::new().unwrap();
1600 let store = new_store(&tmp);
1601 store
1602 .put_bucket_meta(
1603 "b",
1604 &BucketMeta {
1605 name: "b".to_string(),
1606 ..Default::default()
1607 },
1608 )
1609 .unwrap();
1610 store
1611 .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1612 .unwrap();
1613 store
1614 .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1615 .unwrap();
1616 store
1617 .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1618 .unwrap();
1619 store
1620 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1621 .unwrap();
1622 let loaded = store.load().unwrap();
1623 let snap = loaded.buckets.get("b").unwrap();
1624 assert_eq!(
1625 snap.subresources.get("lifecycle.toml").map(String::as_str),
1626 Some("<Lifecycle/>"),
1627 );
1628 assert_eq!(
1629 snap.subresources.get("cors.toml").map(String::as_str),
1630 Some("<Cors/>"),
1631 );
1632 assert_eq!(
1633 snap.subresources.get("policy.toml").map(String::as_str),
1634 Some("{}"),
1635 );
1636 assert_eq!(
1637 snap.subresources.get("tags.toml").map(String::as_str),
1638 Some("x=1"),
1639 );
1640 }
1641
1642 #[test]
1643 fn versioned_put_load_roundtrip() {
1644 let tmp = TempDir::new().unwrap();
1645 let store = new_store(&tmp);
1646 store
1647 .put_bucket_meta(
1648 "b",
1649 &BucketMeta {
1650 name: "b".to_string(),
1651 versioning: Some("Enabled".to_string()),
1652 ..Default::default()
1653 },
1654 )
1655 .unwrap();
1656 let base = chrono::Utc::now();
1657 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1658 .iter()
1659 .enumerate()
1660 {
1661 let mut m = sample_meta("k", body.len() as u64);
1662 m.version_id = Some((*vid).to_string());
1663 m.last_modified = base + chrono::Duration::seconds(i as i64);
1664 store
1665 .put_object(
1666 "b",
1667 "k",
1668 Some(*vid),
1669 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1670 &m,
1671 )
1672 .unwrap();
1673 }
1674 let loaded = store.load().unwrap();
1675 let snap = loaded.buckets.get("b").unwrap();
1676 let versions = snap.object_versions.get("k").expect("versions present");
1677 assert_eq!(versions.len(), 3);
1678 assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1679 assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1680 assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1681 for v in versions {
1682 match &v.body {
1683 BodyRef::Disk { size, .. } => assert!(*size > 0),
1684 _ => panic!("expected Disk body"),
1685 }
1686 }
1687 }
1688
1689 #[test]
1690 fn versioned_load_promotes_latest_live_to_snap_objects() {
1691 let tmp = TempDir::new().unwrap();
1692 let store = new_store(&tmp);
1693 store
1694 .put_bucket_meta(
1695 "b",
1696 &BucketMeta {
1697 name: "b".to_string(),
1698 versioning: Some("Enabled".to_string()),
1699 ..Default::default()
1700 },
1701 )
1702 .unwrap();
1703 let base = chrono::Utc::now();
1704 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1705 .iter()
1706 .enumerate()
1707 {
1708 let mut m = sample_meta("k", body.len() as u64);
1709 m.version_id = Some((*vid).to_string());
1710 m.last_modified = base + chrono::Duration::seconds(i as i64);
1711 store
1712 .put_object(
1713 "b",
1714 "k",
1715 Some(*vid),
1716 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1717 &m,
1718 )
1719 .unwrap();
1720 }
1721 let loaded = store.load().unwrap();
1722 let snap = loaded.buckets.get("b").unwrap();
1723 let current = snap.objects.get("k").expect("current object promoted");
1724 assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1725 }
1726
1727 #[test]
1728 fn versioned_load_trailing_delete_marker_hides_current() {
1729 let tmp = TempDir::new().unwrap();
1730 let store = new_store(&tmp);
1731 store
1732 .put_bucket_meta(
1733 "b",
1734 &BucketMeta {
1735 name: "b".to_string(),
1736 versioning: Some("Enabled".to_string()),
1737 ..Default::default()
1738 },
1739 )
1740 .unwrap();
1741 let base = chrono::Utc::now();
1742 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1743 .iter()
1744 .enumerate()
1745 {
1746 let mut m = sample_meta("k", body.len() as u64);
1747 m.version_id = Some((*vid).to_string());
1748 m.last_modified = base + chrono::Duration::seconds(i as i64);
1749 store
1750 .put_object(
1751 "b",
1752 "k",
1753 Some(*vid),
1754 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1755 &m,
1756 )
1757 .unwrap();
1758 }
1759 let mut dm = sample_meta("k", 0);
1761 dm.version_id = Some("dm1".to_string());
1762 dm.is_delete_marker = true;
1763 dm.last_modified = base + chrono::Duration::seconds(10);
1764 store
1765 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1766 .unwrap();
1767 let loaded = store.load().unwrap();
1768 let snap = loaded.buckets.get("b").unwrap();
1769 assert!(
1770 !snap.objects.contains_key("k"),
1771 "trailing delete marker must hide current object",
1772 );
1773 assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1774 }
1775
1776 #[test]
1777 fn legacy_null_object_overridden_by_newer_versions() {
1778 let tmp = TempDir::new().unwrap();
1781 let store = new_store(&tmp);
1782 store
1783 .put_bucket_meta(
1784 "b",
1785 &BucketMeta {
1786 name: "b".to_string(),
1787 ..Default::default()
1788 },
1789 )
1790 .unwrap();
1791 let base = chrono::Utc::now();
1792 let mut null_meta = sample_meta("k", 3);
1793 null_meta.last_modified = base;
1794 store
1795 .put_object(
1796 "b",
1797 "k",
1798 None,
1799 BodySource::Bytes(Bytes::from_static(b"old")),
1800 &null_meta,
1801 )
1802 .unwrap();
1803 store
1805 .put_bucket_meta(
1806 "b",
1807 &BucketMeta {
1808 name: "b".to_string(),
1809 versioning: Some("Enabled".to_string()),
1810 ..Default::default()
1811 },
1812 )
1813 .unwrap();
1814 let mut v1 = sample_meta("k", 3);
1815 v1.version_id = Some("v1".to_string());
1816 v1.last_modified = base + chrono::Duration::seconds(1);
1817 store
1818 .put_object(
1819 "b",
1820 "k",
1821 Some("v1"),
1822 BodySource::Bytes(Bytes::from_static(b"new")),
1823 &v1,
1824 )
1825 .unwrap();
1826 let mut v2 = sample_meta("k", 5);
1827 v2.version_id = Some("v2".to_string());
1828 v2.last_modified = base + chrono::Duration::seconds(2);
1829 store
1830 .put_object(
1831 "b",
1832 "k",
1833 Some("v2"),
1834 BodySource::Bytes(Bytes::from_static(b"newer")),
1835 &v2,
1836 )
1837 .unwrap();
1838 let loaded = store.load().unwrap();
1839 let snap = loaded.buckets.get("b").unwrap();
1840 let current = snap
1841 .objects
1842 .get("k")
1843 .expect("latest live version must override stale null");
1844 assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1845 assert_eq!(current.meta.size, 5);
1846 }
1847
1848 #[test]
1849 fn legacy_null_hidden_by_trailing_delete_marker() {
1850 let tmp = TempDir::new().unwrap();
1851 let store = new_store(&tmp);
1852 store
1853 .put_bucket_meta(
1854 "b",
1855 &BucketMeta {
1856 name: "b".to_string(),
1857 ..Default::default()
1858 },
1859 )
1860 .unwrap();
1861 let base = chrono::Utc::now();
1862 let mut null_meta = sample_meta("k", 3);
1863 null_meta.last_modified = base;
1864 store
1865 .put_object(
1866 "b",
1867 "k",
1868 None,
1869 BodySource::Bytes(Bytes::from_static(b"old")),
1870 &null_meta,
1871 )
1872 .unwrap();
1873 store
1874 .put_bucket_meta(
1875 "b",
1876 &BucketMeta {
1877 name: "b".to_string(),
1878 versioning: Some("Enabled".to_string()),
1879 ..Default::default()
1880 },
1881 )
1882 .unwrap();
1883 let mut v1 = sample_meta("k", 3);
1884 v1.version_id = Some("v1".to_string());
1885 v1.last_modified = base + chrono::Duration::seconds(1);
1886 store
1887 .put_object(
1888 "b",
1889 "k",
1890 Some("v1"),
1891 BodySource::Bytes(Bytes::from_static(b"new")),
1892 &v1,
1893 )
1894 .unwrap();
1895 let mut dm = sample_meta("k", 0);
1896 dm.version_id = Some("dm1".to_string());
1897 dm.is_delete_marker = true;
1898 dm.last_modified = base + chrono::Duration::seconds(2);
1899 store
1900 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1901 .unwrap();
1902 let loaded = store.load().unwrap();
1903 let snap = loaded.buckets.get("b").unwrap();
1904 assert!(
1905 !snap.objects.contains_key("k"),
1906 "trailing delete marker must hide even a legacy null object",
1907 );
1908 }
1909
1910 #[test]
1911 fn delete_marker_roundtrip_no_body_file() {
1912 let tmp = TempDir::new().unwrap();
1913 let store = new_store(&tmp);
1914 store
1915 .put_bucket_meta(
1916 "b",
1917 &BucketMeta {
1918 name: "b".to_string(),
1919 versioning: Some("Enabled".to_string()),
1920 ..Default::default()
1921 },
1922 )
1923 .unwrap();
1924 let mut m = sample_meta("k", 0);
1925 m.version_id = Some("dm1".to_string());
1926 m.is_delete_marker = true;
1927 store
1928 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1929 .unwrap();
1930 let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1932 assert!(!bin.exists(), "delete marker must not have a .bin file");
1933 assert!(toml_path.exists());
1934
1935 let loaded = store.load().unwrap();
1936 let versions = loaded
1937 .buckets
1938 .get("b")
1939 .unwrap()
1940 .object_versions
1941 .get("k")
1942 .unwrap();
1943 assert_eq!(versions.len(), 1);
1944 assert!(versions[0].meta.is_delete_marker);
1945 match &versions[0].body {
1946 BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1947 _ => panic!("delete marker body should be empty Memory"),
1948 }
1949 }
1950
1951 #[test]
1952 fn mixed_nonversioned_and_versioned_buckets() {
1953 let tmp = TempDir::new().unwrap();
1954 let store = new_store(&tmp);
1955 store
1956 .put_bucket_meta(
1957 "a",
1958 &BucketMeta {
1959 name: "a".to_string(),
1960 ..Default::default()
1961 },
1962 )
1963 .unwrap();
1964 store
1965 .put_bucket_meta(
1966 "b",
1967 &BucketMeta {
1968 name: "b".to_string(),
1969 versioning: Some("Enabled".to_string()),
1970 ..Default::default()
1971 },
1972 )
1973 .unwrap();
1974 let ma = sample_meta("only", 3);
1975 store
1976 .put_object(
1977 "a",
1978 "only",
1979 None,
1980 BodySource::Bytes(Bytes::from_static(b"aaa")),
1981 &ma,
1982 )
1983 .unwrap();
1984 let base = chrono::Utc::now();
1985 for (i, vid) in ["v1", "v2"].iter().enumerate() {
1986 let mut m = sample_meta("twice", 2);
1987 m.version_id = Some((*vid).to_string());
1988 m.last_modified = base + chrono::Duration::seconds(i as i64);
1989 store
1990 .put_object(
1991 "b",
1992 "twice",
1993 Some(*vid),
1994 BodySource::Bytes(Bytes::from_static(b"xx")),
1995 &m,
1996 )
1997 .unwrap();
1998 }
1999 let loaded = store.load().unwrap();
2000 assert_eq!(loaded.buckets.len(), 2);
2001 let a = loaded.buckets.get("a").unwrap();
2002 assert_eq!(a.objects.len(), 1);
2003 assert!(a.object_versions.is_empty());
2004 let b = loaded.buckets.get("b").unwrap();
2005 assert_eq!(
2008 b.objects.get("twice").unwrap().meta.version_id.as_deref(),
2009 Some("v2")
2010 );
2011 assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
2012 }
2013
2014 fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
2015 MpuInit {
2016 upload_id: upload_id.to_string(),
2017 key: key.to_string(),
2018 initiated: chrono::Utc::now(),
2019 content_type: "application/octet-stream".to_string(),
2020 storage_class: "STANDARD".to_string(),
2021 ..Default::default()
2022 }
2023 }
2024
2025 #[test]
2026 fn mpu_create_then_load_empty_parts() {
2027 let tmp = TempDir::new().unwrap();
2028 let store = new_store(&tmp);
2029 store
2030 .put_bucket_meta(
2031 "b",
2032 &BucketMeta {
2033 name: "b".to_string(),
2034 ..Default::default()
2035 },
2036 )
2037 .unwrap();
2038 let init = sample_mpu_init("up1", "k1");
2039 store.mpu_create("b", "up1", &init).unwrap();
2040 let loaded = store.load().unwrap();
2041 let snap = loaded.buckets.get("b").unwrap();
2042 let m = snap.multipart_uploads.get("up1").expect("upload present");
2043 assert_eq!(m.init.upload_id, "up1");
2044 assert_eq!(m.init.key, "k1");
2045 assert!(m.parts.is_empty());
2046 }
2047
2048 #[test]
2049 fn mpu_put_part_then_load_three_parts() {
2050 let tmp = TempDir::new().unwrap();
2051 let store = new_store(&tmp);
2052 store
2053 .put_bucket_meta(
2054 "b",
2055 &BucketMeta {
2056 name: "b".to_string(),
2057 ..Default::default()
2058 },
2059 )
2060 .unwrap();
2061 store
2062 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2063 .unwrap();
2064 let bodies = [
2065 Bytes::from_static(b"part-one"),
2066 Bytes::from_static(b"part-two-longer"),
2067 Bytes::from_static(b"p3"),
2068 ];
2069 for (i, body) in bodies.iter().enumerate() {
2070 let n = (i + 1) as u32;
2071 store
2072 .mpu_put_part(
2073 "b",
2074 "up",
2075 n,
2076 BodySource::Bytes(body.clone()),
2077 &format!("et{}", n),
2078 )
2079 .unwrap();
2080 }
2081 let loaded = store.load().unwrap();
2082 let snap = loaded.buckets.get("b").unwrap();
2083 let m = snap.multipart_uploads.get("up").unwrap();
2084 assert_eq!(m.parts.len(), 3);
2085 for (i, body) in bodies.iter().enumerate() {
2086 let n = (i + 1) as u32;
2087 let part = m.parts.get(&n).unwrap();
2088 assert_eq!(part.meta.part_number, n);
2089 assert_eq!(part.meta.size, body.len() as u64);
2090 assert_eq!(part.meta.etag, format!("et{}", n));
2091 match &part.body {
2092 BodyRef::Disk { path, size, .. } => {
2093 assert_eq!(*size, body.len() as u64);
2094 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2095 }
2096 _ => panic!("expected Disk"),
2097 }
2098 }
2099 }
2100
2101 #[test]
2102 fn mpu_abort_removes_upload() {
2103 let tmp = TempDir::new().unwrap();
2104 let store = new_store(&tmp);
2105 store
2106 .put_bucket_meta(
2107 "b",
2108 &BucketMeta {
2109 name: "b".to_string(),
2110 ..Default::default()
2111 },
2112 )
2113 .unwrap();
2114 store
2115 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2116 .unwrap();
2117 store
2118 .mpu_put_part(
2119 "b",
2120 "up",
2121 1,
2122 BodySource::Bytes(Bytes::from_static(b"x")),
2123 "e",
2124 )
2125 .unwrap();
2126 store.mpu_abort("b", "up").unwrap();
2127 assert!(!store.mpu_dir("b", "up").exists());
2128 store.mpu_abort("b", "up").unwrap();
2130 let loaded = store.load().unwrap();
2131 let snap = loaded.buckets.get("b").unwrap();
2132 assert!(snap.multipart_uploads.is_empty());
2133 }
2134
2135 #[test]
2136 fn mpu_complete_single_part_fast_path() {
2137 let tmp = TempDir::new().unwrap();
2138 let store = new_store(&tmp);
2139 store
2140 .put_bucket_meta(
2141 "b",
2142 &BucketMeta {
2143 name: "b".to_string(),
2144 ..Default::default()
2145 },
2146 )
2147 .unwrap();
2148 store
2149 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2150 .unwrap();
2151 let body = Bytes::from_static(b"only-part-bytes");
2152 store
2153 .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2154 .unwrap();
2155 let meta = sample_meta("k", body.len() as u64);
2156 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2157 match &body_ref {
2158 BodyRef::Disk { path, size, .. } => {
2159 assert_eq!(*size, body.len() as u64);
2160 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2161 }
2162 _ => panic!("expected Disk"),
2163 }
2164 assert!(!store.mpu_dir("b", "up").exists());
2165 }
2166
2167 #[test]
2168 fn mpu_complete_multi_part_concat() {
2169 let tmp = TempDir::new().unwrap();
2170 let store = new_store(&tmp);
2171 store
2172 .put_bucket_meta(
2173 "b",
2174 &BucketMeta {
2175 name: "b".to_string(),
2176 ..Default::default()
2177 },
2178 )
2179 .unwrap();
2180 store
2181 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2182 .unwrap();
2183 let p1 = Bytes::from_static(b"AAAA");
2184 let p2 = Bytes::from_static(b"BBBBBB");
2185 let p3 = Bytes::from_static(b"CC");
2186 for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2187 store
2188 .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2189 .unwrap();
2190 }
2191 let mut expected = Vec::new();
2192 expected.extend_from_slice(&p1);
2193 expected.extend_from_slice(&p2);
2194 expected.extend_from_slice(&p3);
2195 let meta = sample_meta("k", expected.len() as u64);
2196 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2197 let path = match body_ref {
2198 BodyRef::Disk { path, size, .. } => {
2199 assert_eq!(size, expected.len() as u64);
2200 path
2201 }
2202 _ => panic!("expected Disk"),
2203 };
2204 assert_eq!(std::fs::read(&path).unwrap(), expected);
2205 assert!(!store.mpu_dir("b", "up").exists());
2206 }
2207
2208 #[test]
2209 fn mpu_complete_large_streaming_via_file_source() {
2210 let tmp = TempDir::new().unwrap();
2211 let store = new_store(&tmp);
2212 store
2213 .put_bucket_meta(
2214 "b",
2215 &BucketMeta {
2216 name: "b".to_string(),
2217 ..Default::default()
2218 },
2219 )
2220 .unwrap();
2221 store
2222 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2223 .unwrap();
2224
2225 const PART_SIZE: usize = 1024 * 1024;
2228 let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2229 let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2230 for (i, byte) in patterns.iter().enumerate() {
2231 let src = tmp.path().join(format!("src-{}.bin", i + 1));
2232 let data = vec![*byte; PART_SIZE];
2233 std::fs::write(&src, &data).unwrap();
2234 expected.extend_from_slice(&data);
2235 store
2236 .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2237 .unwrap();
2238 }
2239 let meta = sample_meta("k", expected.len() as u64);
2240 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2241 let path = match body_ref {
2242 BodyRef::Disk { path, size, .. } => {
2243 assert_eq!(size, expected.len() as u64);
2244 path
2245 }
2246 _ => panic!("expected Disk"),
2247 };
2248 let actual = std::fs::read(&path).unwrap();
2249 assert_eq!(actual.len(), expected.len());
2250 assert_eq!(actual, expected);
2251 assert!(!store.mpu_dir("b", "up").exists());
2252 }
2253
2254 #[test]
2255 fn mpu_resumable_across_load() {
2256 let tmp = TempDir::new().unwrap();
2257 let store = new_store(&tmp);
2258 store
2259 .put_bucket_meta(
2260 "b",
2261 &BucketMeta {
2262 name: "b".to_string(),
2263 ..Default::default()
2264 },
2265 )
2266 .unwrap();
2267 store
2268 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2269 .unwrap();
2270 let p1 = Bytes::from_static(b"hello-");
2271 let p2 = Bytes::from_static(b"world-");
2272 store
2273 .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2274 .unwrap();
2275 store
2276 .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2277 .unwrap();
2278
2279 let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2281 let loaded = store2.load().unwrap();
2282 let snap = loaded.buckets.get("b").unwrap();
2283 let m = snap.multipart_uploads.get("up").unwrap();
2284 assert_eq!(m.parts.len(), 2);
2285 assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2286 assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2287
2288 let p3 = Bytes::from_static(b"again!");
2290 store2
2291 .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2292 .unwrap();
2293 let mut expected = Vec::new();
2294 expected.extend_from_slice(&p1);
2295 expected.extend_from_slice(&p2);
2296 expected.extend_from_slice(&p3);
2297 let meta = sample_meta("k", expected.len() as u64);
2298 let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2299 let path = match body_ref {
2300 BodyRef::Disk { path, .. } => path,
2301 _ => panic!(),
2302 };
2303 assert_eq!(std::fs::read(&path).unwrap(), expected);
2304 assert!(!store2.mpu_dir("b", "up").exists());
2305 }
2306
2307 #[test]
2308 fn tags_snapshot_roundtrip_via_store() {
2309 let tmp = TempDir::new().unwrap();
2310 let store = new_store(&tmp);
2311 store
2312 .put_bucket_meta(
2313 "b",
2314 &BucketMeta {
2315 name: "b".to_string(),
2316 ..Default::default()
2317 },
2318 )
2319 .unwrap();
2320 let mut tags = BTreeMap::new();
2321 tags.insert("env".to_string(), "prod".to_string());
2322 tags.insert("team".to_string(), "s3".to_string());
2323 let snap = TagsSnapshot { tags: tags.clone() };
2324 let payload = toml::to_string(&snap).unwrap();
2325 store
2326 .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2327 .unwrap();
2328 let loaded = store.load().unwrap();
2329 let text = loaded
2330 .buckets
2331 .get("b")
2332 .unwrap()
2333 .subresources
2334 .get("tags.toml")
2335 .cloned()
2336 .unwrap();
2337 let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2338 assert_eq!(decoded.tags, tags);
2339 }
2340
2341 #[test]
2342 fn acl_snapshot_roundtrip_via_store() {
2343 let tmp = TempDir::new().unwrap();
2344 let store = new_store(&tmp);
2345 store
2346 .put_bucket_meta(
2347 "b",
2348 &BucketMeta {
2349 name: "b".to_string(),
2350 ..Default::default()
2351 },
2352 )
2353 .unwrap();
2354 let snap = AclSnapshot {
2355 owner_id: "owner-abc".to_string(),
2356 grants: vec![
2357 AclGrantSnapshot {
2358 grantee_type: "CanonicalUser".to_string(),
2359 grantee_id: Some("owner-abc".to_string()),
2360 grantee_display_name: Some("owner".to_string()),
2361 grantee_uri: None,
2362 permission: "FULL_CONTROL".to_string(),
2363 },
2364 AclGrantSnapshot {
2365 grantee_type: "Group".to_string(),
2366 grantee_id: None,
2367 grantee_display_name: None,
2368 grantee_uri: Some(
2369 "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2370 ),
2371 permission: "READ".to_string(),
2372 },
2373 ],
2374 };
2375 let payload = toml::to_string(&snap).unwrap();
2376 store
2377 .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2378 .unwrap();
2379 let loaded = store.load().unwrap();
2380 let text = loaded
2381 .buckets
2382 .get("b")
2383 .unwrap()
2384 .subresources
2385 .get("acl.toml")
2386 .cloned()
2387 .unwrap();
2388 let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2389 assert_eq!(decoded.owner_id, "owner-abc");
2390 assert_eq!(decoded.grants.len(), 2);
2391 assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2392 assert_eq!(decoded.grants[1].grantee_type, "Group");
2393 }
2394
2395 #[test]
2396 fn inventory_snapshot_roundtrip_via_store() {
2397 let tmp = TempDir::new().unwrap();
2398 let store = new_store(&tmp);
2399 store
2400 .put_bucket_meta(
2401 "b",
2402 &BucketMeta {
2403 name: "b".to_string(),
2404 ..Default::default()
2405 },
2406 )
2407 .unwrap();
2408 let mut configs = BTreeMap::new();
2409 configs.insert(
2410 "inv-1".to_string(),
2411 "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2412 );
2413 configs.insert(
2414 "inv-2".to_string(),
2415 "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2416 );
2417 let snap = InventorySnapshot {
2418 configs: configs.clone(),
2419 };
2420 let payload = toml::to_string(&snap).unwrap();
2421 store
2422 .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2423 .unwrap();
2424 let loaded = store.load().unwrap();
2425 let text = loaded
2426 .buckets
2427 .get("b")
2428 .unwrap()
2429 .subresources
2430 .get("inventory.toml")
2431 .cloned()
2432 .unwrap();
2433 let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2434 assert_eq!(decoded.configs, configs);
2435 }
2436
2437 #[test]
2438 fn legacy_versioning_file_is_read() {
2439 let tmp = TempDir::new().unwrap();
2440 let store = new_store(&tmp);
2441 store
2443 .put_bucket_meta(
2444 "b",
2445 &BucketMeta {
2446 name: "b".to_string(),
2447 ..Default::default()
2448 },
2449 )
2450 .unwrap();
2451 let path = store.bucket_dir("b").join("versioning.toml");
2453 std::fs::write(&path, "Enabled").unwrap();
2454 let loaded = store.load().unwrap();
2455 let snap = loaded.buckets.get("b").unwrap();
2456 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2457 }
2458
2459 #[test]
2460 fn load_skips_corrupt_bucket_and_keeps_others() {
2461 let tmp = TempDir::new().unwrap();
2464 let store = new_store(&tmp);
2465 store
2466 .put_bucket_meta(
2467 "good",
2468 &BucketMeta {
2469 name: "good".to_string(),
2470 ..Default::default()
2471 },
2472 )
2473 .unwrap();
2474 store
2475 .put_object(
2476 "good",
2477 "k",
2478 None,
2479 BodySource::Bytes(Bytes::from_static(b"hi")),
2480 &sample_meta("k", 2),
2481 )
2482 .unwrap();
2483
2484 let bad_dir = store.buckets_dir().join("bad");
2485 std::fs::create_dir_all(&bad_dir).unwrap();
2486 std::fs::write(bad_dir.join("meta.toml"), b"not valid toml = = =").unwrap();
2487
2488 let loaded = store.load().unwrap();
2489 assert!(
2490 loaded.buckets.contains_key("good"),
2491 "valid bucket must load"
2492 );
2493 assert!(
2494 !loaded.buckets.contains_key("bad"),
2495 "corrupt bucket must be skipped, not abort the load"
2496 );
2497 }
2498}