1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11 #[serde(default)]
12 pub account_id: String,
13 #[serde(default)]
14 pub region: String,
15 #[serde(default)]
16 pub buckets: BTreeMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21 pub meta: BucketMeta,
22 #[serde(default)]
23 pub objects: BTreeMap<String, LoadedObject>,
24 #[serde(default)]
25 pub object_versions: BTreeMap<String, Vec<LoadedObject>>,
26 #[serde(default)]
27 pub subresources: BTreeMap<String, String>,
28 #[serde(default)]
29 pub multipart_uploads: BTreeMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34 pub init: MpuInit,
35 #[serde(default)]
36 pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41 pub meta: UploadPartMeta,
42 #[serde(default)]
43 pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48 pub meta: ObjectMeta,
49 #[serde(default)]
50 pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55 #[serde(default)]
56 pub name: String,
57 #[serde(default = "default_time")]
58 pub creation_date: DateTime<Utc>,
59 #[serde(default)]
60 pub region: String,
61 #[serde(default)]
62 pub versioning: Option<String>,
63 #[serde(default)]
65 pub mfa_delete: Option<String>,
66 #[serde(default)]
67 pub acl: Option<String>,
68 #[serde(default)]
69 pub acl_owner_id: String,
70 #[serde(default)]
71 pub accelerate_status: Option<String>,
72 #[serde(default)]
73 pub eventbridge_enabled: bool,
74 #[serde(default)]
75 pub lifecycle_transition_default_min_size: Option<String>,
76}
77
78fn default_time() -> DateTime<Utc> {
79 DateTime::<Utc>::MIN_UTC
80}
81
82#[derive(Debug, Default, Clone, Serialize, Deserialize)]
83pub struct AclGrantSnapshot {
84 pub grantee_type: String,
85 #[serde(default)]
86 pub grantee_id: Option<String>,
87 #[serde(default)]
88 pub grantee_display_name: Option<String>,
89 #[serde(default)]
90 pub grantee_uri: Option<String>,
91 pub permission: String,
92}
93
94#[derive(Debug, Default, Clone, Serialize, Deserialize)]
95pub struct TagsSnapshot {
96 #[serde(default)]
97 pub tags: BTreeMap<String, String>,
98}
99
100#[derive(Debug, Default, Clone, Serialize, Deserialize)]
101pub struct AclSnapshot {
102 #[serde(default)]
103 pub owner_id: String,
104 #[serde(default)]
105 pub grants: Vec<AclGrantSnapshot>,
106}
107
108#[derive(Debug, Default, Clone, Serialize, Deserialize)]
109pub struct InventorySnapshot {
110 #[serde(default)]
111 pub configs: BTreeMap<String, String>,
112}
113
114#[derive(Debug, Default, Clone, Serialize, Deserialize)]
115pub struct ObjectMeta {
116 #[serde(default)]
117 pub key: String,
118 #[serde(default)]
119 pub content_type: String,
120 #[serde(default)]
121 pub etag: String,
122 #[serde(default)]
123 pub size: u64,
124 #[serde(default = "default_time")]
125 pub last_modified: DateTime<Utc>,
126 #[serde(default)]
127 pub metadata: BTreeMap<String, String>,
128 #[serde(default)]
129 pub tags: BTreeMap<String, String>,
130 #[serde(default)]
131 pub storage_class: String,
132 #[serde(default)]
133 pub acl_grants: Vec<AclGrantSnapshot>,
134 #[serde(default)]
135 pub acl_owner_id: Option<String>,
136 #[serde(default)]
137 pub parts_count: Option<u32>,
138 #[serde(default)]
139 pub part_sizes: Option<Vec<(u32, u64)>>,
140 #[serde(default)]
141 pub sse_algorithm: Option<String>,
142 #[serde(default)]
143 pub sse_kms_key_id: Option<String>,
144 #[serde(default)]
145 pub bucket_key_enabled: Option<bool>,
146 #[serde(default)]
147 pub version_id: Option<String>,
148 #[serde(default)]
149 pub is_delete_marker: bool,
150 #[serde(default)]
151 pub restore_ongoing: Option<bool>,
152 #[serde(default)]
153 pub restore_expiry: Option<String>,
154 #[serde(default)]
155 pub checksum_algorithm: Option<String>,
156 #[serde(default)]
157 pub checksum_value: Option<String>,
158 #[serde(default)]
159 pub lock_mode: Option<String>,
160 #[serde(default)]
161 pub lock_retain_until: Option<DateTime<Utc>>,
162 #[serde(default)]
163 pub lock_legal_hold: Option<String>,
164 #[serde(default)]
165 pub content_encoding: Option<String>,
166 #[serde(default)]
167 pub cache_control: Option<String>,
168 #[serde(default)]
169 pub content_disposition: Option<String>,
170 #[serde(default)]
171 pub content_language: Option<String>,
172 #[serde(default)]
173 pub expires: Option<String>,
174 #[serde(default)]
175 pub website_redirect_location: Option<String>,
176}
177
178#[derive(Debug, Default, Clone, Serialize, Deserialize)]
179pub struct MpuInit {
180 pub upload_id: String,
181 pub key: String,
182 #[serde(default = "default_time")]
183 pub initiated: DateTime<Utc>,
184 #[serde(default)]
185 pub metadata: BTreeMap<String, String>,
186 #[serde(default)]
187 pub content_type: String,
188 #[serde(default)]
189 pub storage_class: String,
190 #[serde(default)]
191 pub sse_algorithm: Option<String>,
192 #[serde(default)]
193 pub sse_kms_key_id: Option<String>,
194 #[serde(default)]
195 pub tagging: Option<String>,
196 #[serde(default)]
197 pub acl_grants: Vec<AclGrantSnapshot>,
198 #[serde(default)]
199 pub checksum_algorithm: Option<String>,
200 #[serde(default)]
201 pub cache_control: Option<String>,
202 #[serde(default)]
203 pub content_disposition: Option<String>,
204 #[serde(default)]
205 pub content_language: Option<String>,
206 #[serde(default)]
207 pub expires: Option<String>,
208}
209
210#[derive(Debug, Default, Clone, Serialize, Deserialize)]
211pub struct UploadPartMeta {
212 pub part_number: u32,
213 pub etag: String,
214 pub size: u64,
215 #[serde(default = "default_time")]
216 pub last_modified: DateTime<Utc>,
217}
218
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum BucketSubresource {
221 Tags,
222 Lifecycle,
223 Cors,
224 Policy,
225 Notification,
226 Logging,
227 Website,
228 PublicAccessBlock,
229 ObjectLock,
230 Replication,
231 Ownership,
232 Inventory,
233 Encryption,
234 Versioning,
235 Acl,
236 Accelerate,
237 Analytics,
238 IntelligentTiering,
239 Metrics,
240 RequestPayment,
241 Abac,
242 MetadataConfiguration,
243 MetadataTableConfiguration,
244}
245
246pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
247 BucketSubresource::Tags,
248 BucketSubresource::Lifecycle,
249 BucketSubresource::Cors,
250 BucketSubresource::Policy,
251 BucketSubresource::Notification,
252 BucketSubresource::Logging,
253 BucketSubresource::Website,
254 BucketSubresource::PublicAccessBlock,
255 BucketSubresource::ObjectLock,
256 BucketSubresource::Replication,
257 BucketSubresource::Ownership,
258 BucketSubresource::Inventory,
259 BucketSubresource::Encryption,
260 BucketSubresource::Versioning,
261 BucketSubresource::Acl,
262 BucketSubresource::Accelerate,
263 BucketSubresource::Analytics,
264 BucketSubresource::IntelligentTiering,
265 BucketSubresource::Metrics,
266 BucketSubresource::RequestPayment,
267 BucketSubresource::Abac,
268 BucketSubresource::MetadataConfiguration,
269 BucketSubresource::MetadataTableConfiguration,
270];
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub enum BodyRef {
274 #[serde(skip)]
275 Memory(Bytes),
276 Disk {
277 bucket: String,
278 key: String,
279 #[serde(default)]
280 version: Option<String>,
281 path: PathBuf,
282 size: u64,
283 },
284}
285
286impl BodyRef {
287 pub fn size(&self) -> u64 {
288 match self {
289 BodyRef::Memory(b) => b.len() as u64,
290 BodyRef::Disk { size, .. } => *size,
291 }
292 }
293}
294
295impl Default for BodyRef {
296 fn default() -> Self {
297 BodyRef::Memory(Bytes::new())
298 }
299}
300
301#[derive(Debug)]
302pub enum BodySource {
303 Bytes(Bytes),
304 File(PathBuf),
307 FileCopy(PathBuf),
311}
312
313fn read_body_source(body: BodySource) -> StoreResult<Bytes> {
319 match body {
320 BodySource::Bytes(b) => Ok(b),
321 BodySource::File(p) => {
322 let bytes = std::fs::read(&p)?;
323 let _ = std::fs::remove_file(&p);
326 Ok(Bytes::from(bytes))
327 }
328 BodySource::FileCopy(p) => {
329 let bytes = std::fs::read(&p)?;
330 Ok(Bytes::from(bytes))
331 }
332 }
333}
334
335#[derive(Debug, Error)]
336pub enum StoreError {
337 #[error("io error: {0}")]
338 Io(#[from] std::io::Error),
339 #[error("serialization error: {0}")]
340 Serde(String),
341 #[error("not supported by this store")]
342 NotSupported,
343 #[error("{0}")]
344 Other(String),
345}
346
347pub type StoreResult<T> = Result<T, StoreError>;
348
349pub trait S3Store: Send + Sync {
350 fn load(&self) -> StoreResult<S3State>;
351
352 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
353 fn put_bucket_subresource(
354 &self,
355 bucket: &str,
356 kind: BucketSubresource,
357 payload: &str,
358 ) -> StoreResult<()>;
359 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
360 fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
361
362 fn put_object(
363 &self,
364 bucket: &str,
365 key: &str,
366 version: Option<&str>,
367 body: BodySource,
368 meta: &ObjectMeta,
369 ) -> StoreResult<BodyRef>;
370 fn put_object_meta(
371 &self,
372 bucket: &str,
373 key: &str,
374 version: Option<&str>,
375 meta: &ObjectMeta,
376 ) -> StoreResult<()>;
377 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
378 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
379
380 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
381 fn mpu_put_part(
387 &self,
388 bucket: &str,
389 upload_id: &str,
390 part_number: u32,
391 body: BodySource,
392 etag: &str,
393 ) -> StoreResult<BodyRef>;
394 fn spool_dir(&self) -> Option<std::path::PathBuf> {
401 None
402 }
403 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
404 fn mpu_complete(
405 &self,
406 bucket: &str,
407 upload_id: &str,
408 final_key: &str,
409 version: Option<&str>,
410 meta: &ObjectMeta,
411 ) -> StoreResult<BodyRef>;
412}
413
414pub struct MemoryS3Store;
415
416impl MemoryS3Store {
417 pub fn new() -> Self {
418 Self
419 }
420}
421
422impl Default for MemoryS3Store {
423 fn default() -> Self {
424 Self::new()
425 }
426}
427
428impl S3Store for MemoryS3Store {
429 fn load(&self) -> StoreResult<S3State> {
430 Ok(S3State::default())
431 }
432
433 fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
434 Ok(())
435 }
436 fn put_bucket_subresource(
437 &self,
438 _bucket: &str,
439 _kind: BucketSubresource,
440 _payload: &str,
441 ) -> StoreResult<()> {
442 Ok(())
443 }
444 fn delete_bucket_subresource(
445 &self,
446 _bucket: &str,
447 _kind: BucketSubresource,
448 ) -> StoreResult<()> {
449 Ok(())
450 }
451 fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
452 Ok(())
453 }
454
455 fn put_object(
456 &self,
457 _bucket: &str,
458 _key: &str,
459 _version: Option<&str>,
460 body: BodySource,
461 _meta: &ObjectMeta,
462 ) -> StoreResult<BodyRef> {
463 Ok(BodyRef::Memory(read_body_source(body)?))
464 }
465 fn put_object_meta(
466 &self,
467 _bucket: &str,
468 _key: &str,
469 _version: Option<&str>,
470 _meta: &ObjectMeta,
471 ) -> StoreResult<()> {
472 Ok(())
473 }
474 fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
475 Ok(())
476 }
477 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
478 match body {
479 BodyRef::Memory(b) => Ok(b.clone()),
480 BodyRef::Disk { .. } => {
481 panic!("MemoryS3Store cannot open Disk-backed BodyRef")
482 }
483 }
484 }
485
486 fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
487 Ok(())
488 }
489 fn mpu_put_part(
490 &self,
491 _bucket: &str,
492 _upload_id: &str,
493 _part_number: u32,
494 body: BodySource,
495 _etag: &str,
496 ) -> StoreResult<BodyRef> {
497 Ok(BodyRef::Memory(read_body_source(body)?))
500 }
501 fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
502 Ok(())
503 }
504 fn mpu_complete(
505 &self,
506 _bucket: &str,
507 _upload_id: &str,
508 _final_key: &str,
509 _version: Option<&str>,
510 _meta: &ObjectMeta,
511 ) -> StoreResult<BodyRef> {
512 Ok(BodyRef::Memory(Bytes::new()))
513 }
514}
515
516pub struct DiskS3Store {
517 root: PathBuf,
518 cache: std::sync::Arc<crate::cache::BodyCache>,
519}
520
521impl DiskS3Store {
522 pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
523 Self { root, cache }
524 }
525
526 fn buckets_dir(&self) -> PathBuf {
527 self.root.join("buckets")
528 }
529
530 fn bucket_dir(&self, bucket: &str) -> PathBuf {
531 self.buckets_dir()
532 .join(crate::key_escape::escape_key_segment(bucket))
533 }
534
535 fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
536 self.bucket_dir(bucket)
537 .join("objects")
538 .join(crate::key_escape::escape_key_segment(key))
539 }
540
541 fn version_tag(version: Option<&str>) -> String {
542 version.unwrap_or("null").to_string()
543 }
544
545 fn object_paths(
546 &self,
547 bucket: &str,
548 key: &str,
549 version: Option<&str>,
550 ) -> (PathBuf, PathBuf, PathBuf) {
551 let dir = self.object_dir(bucket, key);
552 let tag = Self::version_tag(version);
553 let bin = dir.join(format!("{}.bin", tag));
554 let toml = dir.join(format!("{}.toml", tag));
555 (dir, bin, toml)
556 }
557
558 fn subresource_filename(kind: BucketSubresource) -> &'static str {
559 match kind {
560 BucketSubresource::Tags => "tags.toml",
561 BucketSubresource::Lifecycle => "lifecycle.toml",
562 BucketSubresource::Cors => "cors.toml",
563 BucketSubresource::Policy => "policy.toml",
564 BucketSubresource::Notification => "notification.toml",
565 BucketSubresource::Logging => "logging.toml",
566 BucketSubresource::Website => "website.toml",
567 BucketSubresource::PublicAccessBlock => "public_access_block.toml",
568 BucketSubresource::ObjectLock => "object_lock.toml",
569 BucketSubresource::Replication => "replication.toml",
570 BucketSubresource::Ownership => "ownership.toml",
571 BucketSubresource::Inventory => "inventory.toml",
572 BucketSubresource::Encryption => "encryption.toml",
573 BucketSubresource::Versioning => "versioning.toml",
574 BucketSubresource::Acl => "acl.toml",
575 BucketSubresource::Accelerate => "accelerate.toml",
576 BucketSubresource::Analytics => "analytics.toml",
577 BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
578 BucketSubresource::Metrics => "metrics.toml",
579 BucketSubresource::RequestPayment => "request_payment.toml",
580 BucketSubresource::Abac => "abac.toml",
581 BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
582 BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
583 }
584 }
585
586 fn cleanup_empty(dir: &std::path::Path) {
587 let _ = std::fs::remove_dir(dir);
588 }
589
590 fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
591 self.bucket_dir(bucket)
592 .join("mpu")
593 .join(crate::key_escape::escape_key_segment(upload_id))
594 }
595
596 fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
597 self.mpu_dir(bucket, upload_id).join("parts")
598 }
599
600 fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
601 self.mpu_parts_dir(bucket, upload_id)
602 .join(format!("{}.bin", part_number))
603 }
604
605 fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
606 self.mpu_parts_dir(bucket, upload_id)
607 .join(format!("{}.toml", part_number))
608 }
609
610 fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
611 crate::cache::BodyKey::new(
612 bucket.to_string(),
613 format!("__mpu__/{}", upload_id),
614 Some(format!("part-{}", part_number)),
615 )
616 }
617}
618
619fn io_other(msg: impl Into<String>) -> StoreError {
620 StoreError::Other(msg.into())
621}
622
623impl S3Store for DiskS3Store {
624 fn load(&self) -> StoreResult<S3State> {
625 let mut state = S3State::default();
626 let buckets_dir = self.buckets_dir();
627 if !buckets_dir.exists() {
628 return Ok(state);
629 }
630 for entry in std::fs::read_dir(&buckets_dir)? {
631 let entry = entry?;
632 if !entry.file_type()?.is_dir() {
633 continue;
634 }
635 let bdir = entry.path();
636 let loaded: StoreResult<Option<BucketSnapshot>> = (|| {
641 let meta_path = bdir.join("meta.toml");
642 if !meta_path.exists() {
643 return Ok(None);
644 }
645 let meta_text = std::fs::read_to_string(&meta_path)?;
646 let mut meta: BucketMeta =
647 toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
648 let mut snap = BucketSnapshot {
649 meta: meta.clone(),
650 objects: BTreeMap::new(),
651 object_versions: BTreeMap::new(),
652 subresources: BTreeMap::new(),
653 multipart_uploads: BTreeMap::new(),
654 };
655
656 for kind in ALL_SUBRESOURCES {
657 let fname = Self::subresource_filename(*kind);
658 let path = bdir.join(fname);
659 if path.exists() {
660 let text = std::fs::read_to_string(&path)?;
661 if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none()
662 {
663 let stripped = text.trim();
664 if !stripped.is_empty() {
665 snap.meta.versioning = Some(stripped.to_string());
666 meta.versioning = snap.meta.versioning.clone();
667 }
668 }
669 snap.subresources.insert(fname.to_string(), text);
670 }
671 }
672
673 let objects_root = bdir.join("objects");
674 if objects_root.exists() {
675 for okey_entry in std::fs::read_dir(&objects_root)? {
676 let okey_entry = okey_entry?;
677 if !okey_entry.file_type()?.is_dir() {
678 continue;
679 }
680 let key_dir = okey_entry.path();
681 let mut versioned: Vec<LoadedObject> = Vec::new();
682 let mut key_name: Option<String> = None;
683 for version_entry in std::fs::read_dir(&key_dir)? {
684 let version_entry = version_entry?;
685 let path = version_entry.path();
686 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
687 continue;
688 };
689 if !fname.ends_with(".toml") {
690 continue;
691 }
692 let version_tag = &fname[..fname.len() - 5];
693 let toml_text = std::fs::read_to_string(&path)?;
694 let obj_meta: ObjectMeta = toml::from_str(&toml_text)
695 .map_err(|e| StoreError::Serde(e.to_string()))?;
696 let bin_path = key_dir.join(format!("{}.bin", version_tag));
697 let (body, size) = if obj_meta.is_delete_marker {
698 (BodyRef::Memory(Bytes::new()), 0u64)
699 } else if bin_path.exists() {
700 let sz = std::fs::metadata(&bin_path)?.len();
701 (
702 BodyRef::Disk {
703 bucket: meta.name.clone(),
704 key: obj_meta.key.clone(),
705 version: if version_tag == "null" {
706 None
707 } else {
708 Some(version_tag.to_string())
709 },
710 path: bin_path,
711 size: sz,
712 },
713 sz,
714 )
715 } else {
716 return Err(StoreError::Other(format!(
721 "missing body file: {}",
722 bin_path.display()
723 )));
724 };
725 let _ = size;
726 key_name.get_or_insert_with(|| obj_meta.key.clone());
727 if version_tag == "null" && obj_meta.version_id.is_none() {
728 snap.objects.insert(
729 obj_meta.key.clone(),
730 LoadedObject {
731 meta: obj_meta,
732 body,
733 },
734 );
735 } else {
736 versioned.push(LoadedObject {
737 meta: obj_meta,
738 body,
739 });
740 }
741 }
742 if !versioned.is_empty() {
743 versioned.sort_by_key(|v| v.meta.last_modified);
744 if let Some(key) = key_name {
745 match versioned.last() {
752 Some(newest) if newest.meta.is_delete_marker => {
753 snap.objects.remove(&key);
754 }
755 Some(newest) => {
756 snap.objects.insert(key.clone(), newest.clone());
757 }
758 None => {}
759 }
760 snap.object_versions.insert(key, versioned);
761 }
762 }
763 }
764 }
765
766 let mpu_root = bdir.join("mpu");
767 if mpu_root.exists() {
768 for upload_entry in std::fs::read_dir(&mpu_root)? {
769 let upload_entry = upload_entry?;
770 if !upload_entry.file_type()?.is_dir() {
771 continue;
772 }
773 let upload_dir = upload_entry.path();
774 let init_path = upload_dir.join("init.toml");
775 if !init_path.exists() {
776 continue;
777 }
778 let init_text = std::fs::read_to_string(&init_path)?;
779 let init: MpuInit = toml::from_str(&init_text)
780 .map_err(|e| StoreError::Serde(e.to_string()))?;
781 let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
782 let parts_dir = upload_dir.join("parts");
783 if parts_dir.exists() {
784 for part_entry in std::fs::read_dir(&parts_dir)? {
785 let part_entry = part_entry?;
786 let path = part_entry.path();
787 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
788 continue;
789 };
790 if !fname.ends_with(".toml") {
791 continue;
792 }
793 let stem = &fname[..fname.len() - 5];
794 let Ok(part_number) = stem.parse::<u32>() else {
795 continue;
796 };
797 let toml_text = std::fs::read_to_string(&path)?;
798 let part_meta: UploadPartMeta = toml::from_str(&toml_text)
799 .map_err(|e| StoreError::Serde(e.to_string()))?;
800 let bin_path = parts_dir.join(format!("{}.bin", part_number));
801 if !bin_path.exists() {
802 return Err(StoreError::Other(format!(
803 "missing multipart part body file: {}",
804 bin_path.display()
805 )));
806 }
807 let sz = std::fs::metadata(&bin_path)?.len();
808 let body = BodyRef::Disk {
809 bucket: meta.name.clone(),
810 key: format!("__mpu__/{}", init.upload_id),
811 version: Some(format!("part-{}", part_number)),
812 path: bin_path,
813 size: sz,
814 };
815 loaded_parts.insert(
816 part_number,
817 LoadedPart {
818 meta: part_meta,
819 body,
820 },
821 );
822 }
823 }
824 snap.multipart_uploads.insert(
825 init.upload_id.clone(),
826 LoadedMpu {
827 init,
828 parts: loaded_parts,
829 },
830 );
831 }
832 }
833
834 Ok(Some(snap))
835 })();
836 match loaded {
837 Ok(Some(snap)) => {
838 state.buckets.insert(snap.meta.name.clone(), snap);
839 }
840 Ok(None) => {}
841 Err(e) => tracing::warn!(
842 bucket = %bdir.display(),
843 error = %e,
844 "skipping unreadable S3 bucket during load"
845 ),
846 }
847 }
848 Ok(state)
849 }
850
851 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
852 let dir = self.bucket_dir(bucket);
853 std::fs::create_dir_all(&dir)?;
854 crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
855 Ok(())
856 }
857
858 fn put_bucket_subresource(
859 &self,
860 bucket: &str,
861 kind: BucketSubresource,
862 payload: &str,
863 ) -> StoreResult<()> {
864 let dir = self.bucket_dir(bucket);
865 std::fs::create_dir_all(&dir)?;
866 let path = dir.join(Self::subresource_filename(kind));
867 crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
868 Ok(())
869 }
870
871 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
872 let path = self
873 .bucket_dir(bucket)
874 .join(Self::subresource_filename(kind));
875 match std::fs::remove_file(&path) {
876 Ok(_) => Ok(()),
877 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
878 Err(e) => Err(e.into()),
879 }
880 }
881
882 fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
883 let dir = self.bucket_dir(bucket);
884 match std::fs::remove_dir_all(&dir) {
885 Ok(_) => Ok(()),
886 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
887 Err(e) => Err(e.into()),
888 }
889 }
890
891 fn put_object(
892 &self,
893 bucket: &str,
894 key: &str,
895 version: Option<&str>,
896 body: BodySource,
897 meta: &ObjectMeta,
898 ) -> StoreResult<BodyRef> {
899 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
900 std::fs::create_dir_all(&dir)?;
901
902 if meta.is_delete_marker {
903 crate::atomic::write_atomic_toml(&toml_path, meta)?;
904 return Ok(BodyRef::Memory(Bytes::new()));
905 }
906
907 let size: u64;
908 let bytes_for_cache: Option<Bytes>;
909 match body {
910 BodySource::Bytes(b) => {
911 size = b.len() as u64;
912 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
913 bytes_for_cache = Some(b);
914 }
915 BodySource::File(src) => {
916 let src_size = std::fs::metadata(&src)?.len();
917 size = src_size;
918 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
919 bytes_for_cache = None;
920 }
921 BodySource::FileCopy(src) => {
922 let src_size = std::fs::metadata(&src)?.len();
923 size = src_size;
924 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
925 bytes_for_cache = None;
926 }
927 }
928
929 crate::atomic::write_atomic_toml(&toml_path, meta)?;
930
931 let body_key = crate::cache::BodyKey::new(
932 bucket.to_string(),
933 key.to_string(),
934 version.map(|s| s.to_string()),
935 );
936 if let Some(b) = bytes_for_cache {
937 self.cache.insert(body_key, b);
938 } else {
939 self.cache.invalidate(&crate::cache::BodyKey::new(
940 bucket.to_string(),
941 key.to_string(),
942 version.map(|s| s.to_string()),
943 ));
944 }
945
946 Ok(BodyRef::Disk {
947 bucket: bucket.to_string(),
948 key: key.to_string(),
949 version: version.map(|s| s.to_string()),
950 path: bin_path,
951 size,
952 })
953 }
954
955 fn put_object_meta(
956 &self,
957 bucket: &str,
958 key: &str,
959 version: Option<&str>,
960 meta: &ObjectMeta,
961 ) -> StoreResult<()> {
962 let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
963 std::fs::create_dir_all(&dir)?;
964 crate::atomic::write_atomic_toml(&toml_path, meta)?;
965 Ok(())
966 }
967
968 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
969 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
970 for p in [&bin_path, &toml_path] {
971 match std::fs::remove_file(p) {
972 Ok(_) => {}
973 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
974 Err(e) => return Err(e.into()),
975 }
976 }
977 Self::cleanup_empty(&dir);
978
979 self.cache.invalidate(&crate::cache::BodyKey::new(
980 bucket.to_string(),
981 key.to_string(),
982 version.map(|s| s.to_string()),
983 ));
984 Ok(())
985 }
986
987 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
988 match body {
989 BodyRef::Memory(b) => Ok(b.clone()),
990 BodyRef::Disk {
991 bucket,
992 key,
993 version,
994 path,
995 size: _,
996 } => {
997 let body_key =
998 crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
999 if let Some(bytes) = self.cache.get(&body_key) {
1000 return Ok(bytes);
1001 }
1002 let bytes = Bytes::from(std::fs::read(path)?);
1003 self.cache.insert(body_key, bytes.clone());
1004 Ok(bytes)
1005 }
1006 }
1007 }
1008
1009 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
1010 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1011 std::fs::create_dir_all(&parts_dir)?;
1012 let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
1013 crate::atomic::write_atomic_toml(&init_path, init)?;
1014 Ok(())
1015 }
1016
1017 fn mpu_put_part(
1018 &self,
1019 bucket: &str,
1020 upload_id: &str,
1021 part_number: u32,
1022 body: BodySource,
1023 etag: &str,
1024 ) -> StoreResult<BodyRef> {
1025 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1026 std::fs::create_dir_all(&parts_dir)?;
1027 let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
1028 let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
1029
1030 let size: u64 = match body {
1031 BodySource::Bytes(b) => {
1032 let n = b.len() as u64;
1033 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
1034 let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
1035 self.cache.insert(cache_key, b);
1036 n
1037 }
1038 BodySource::File(src) => {
1039 let n = std::fs::metadata(&src)?.len();
1040 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
1041 self.cache
1042 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1043 n
1044 }
1045 BodySource::FileCopy(src) => {
1046 let n = std::fs::metadata(&src)?.len();
1047 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
1048 self.cache
1049 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
1050 n
1051 }
1052 };
1053
1054 let meta = UploadPartMeta {
1055 part_number,
1056 etag: etag.to_string(),
1057 size,
1058 last_modified: Utc::now(),
1059 };
1060 crate::atomic::write_atomic_toml(&toml_path, &meta)?;
1061 Ok(BodyRef::Disk {
1064 bucket: bucket.to_string(),
1065 key: format!("__mpu__/{upload_id}/part-{part_number:05}"),
1066 version: None,
1067 path: bin_path,
1068 size,
1069 })
1070 }
1071
1072 fn spool_dir(&self) -> Option<std::path::PathBuf> {
1073 let dir = self.root.join(".spool");
1074 let _ = std::fs::create_dir_all(&dir);
1078 Some(dir)
1079 }
1080
1081 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
1082 let dir = self.mpu_dir(bucket, upload_id);
1083 if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
1085 for entry in entries.flatten() {
1086 let path = entry.path();
1087 if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
1088 if let Some(stem) = fname.strip_suffix(".bin") {
1089 if let Ok(n) = stem.parse::<u32>() {
1090 self.cache
1091 .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1092 }
1093 }
1094 }
1095 }
1096 }
1097 match std::fs::remove_dir_all(&dir) {
1098 Ok(_) => Ok(()),
1099 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1100 Err(e) => Err(e.into()),
1101 }
1102 }
1103
1104 fn mpu_complete(
1105 &self,
1106 bucket: &str,
1107 upload_id: &str,
1108 final_key: &str,
1109 version: Option<&str>,
1110 meta: &ObjectMeta,
1111 ) -> StoreResult<BodyRef> {
1112 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1113 if !parts_dir.exists() {
1114 return Err(io_other(format!(
1115 "mpu_complete: no parts dir for upload {}",
1116 upload_id
1117 )));
1118 }
1119
1120 let mut part_numbers: Vec<u32> = Vec::new();
1122 for entry in std::fs::read_dir(&parts_dir)? {
1123 let entry = entry?;
1124 let path = entry.path();
1125 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1126 continue;
1127 };
1128 if let Some(stem) = fname.strip_suffix(".toml") {
1129 if let Ok(n) = stem.parse::<u32>() {
1130 if self.mpu_part_bin(bucket, upload_id, n).exists() {
1131 part_numbers.push(n);
1132 }
1133 }
1134 }
1135 }
1136 part_numbers.sort_unstable();
1137 if part_numbers.is_empty() {
1138 return Err(io_other(format!(
1139 "mpu_complete: upload {} has no parts",
1140 upload_id
1141 )));
1142 }
1143
1144 let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1145 std::fs::create_dir_all(&dir)?;
1146
1147 let total_size: u64 = if part_numbers.len() == 1 {
1148 let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1149 let sz = std::fs::metadata(&only)?.len();
1150 match std::fs::rename(&only, &bin_path) {
1151 Ok(_) => {}
1152 Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1153 {
1155 let mut input = std::fs::File::open(&only)?;
1156 let tmp = {
1157 let mut os = bin_path.as_os_str().to_owned();
1158 os.push(".tmp");
1159 PathBuf::from(os)
1160 };
1161 let mut out = std::fs::OpenOptions::new()
1162 .write(true)
1163 .create(true)
1164 .truncate(true)
1165 .open(&tmp)?;
1166 std::io::copy(&mut input, &mut out)?;
1167 out.sync_all()?;
1168 std::fs::rename(&tmp, &bin_path)?;
1169 }
1170 let _ = std::fs::remove_file(&only);
1171 }
1172 Err(e) => return Err(e.into()),
1173 }
1174 if let Some(parent) = bin_path.parent() {
1179 if let Ok(dir_handle) = std::fs::File::open(parent) {
1180 let _ = dir_handle.sync_all();
1181 }
1182 }
1183 sz
1184 } else {
1185 let tmp = {
1186 let mut os = bin_path.as_os_str().to_owned();
1187 os.push(".tmp");
1188 PathBuf::from(os)
1189 };
1190 let mut total: u64 = 0;
1191 {
1192 let mut out = std::fs::OpenOptions::new()
1193 .write(true)
1194 .create(true)
1195 .truncate(true)
1196 .open(&tmp)?;
1197 for n in &part_numbers {
1198 let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1199 let mut input = std::fs::File::open(&part_path)?;
1200 let copied = std::io::copy(&mut input, &mut out)?;
1201 total += copied;
1202 }
1203 out.sync_all()?;
1204 }
1205 std::fs::rename(&tmp, &bin_path)?;
1206 if let Some(parent) = bin_path.parent() {
1207 if let Ok(dir_handle) = std::fs::File::open(parent) {
1208 let _ = dir_handle.sync_all();
1209 }
1210 }
1211 total
1212 };
1213
1214 crate::atomic::write_atomic_toml(&toml_path, meta)?;
1215
1216 for n in &part_numbers {
1218 self.cache
1219 .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1220 }
1221 let mpu_dir = self.mpu_dir(bucket, upload_id);
1222 let _ = std::fs::remove_dir_all(&mpu_dir);
1223
1224 self.cache.invalidate(&crate::cache::BodyKey::new(
1229 bucket.to_string(),
1230 final_key.to_string(),
1231 version.map(|s| s.to_string()),
1232 ));
1233
1234 Ok(BodyRef::Disk {
1235 bucket: bucket.to_string(),
1236 key: final_key.to_string(),
1237 version: version.map(|s| s.to_string()),
1238 path: bin_path,
1239 size: total_size,
1240 })
1241 }
1242}
1243
1244fn libc_exdev() -> i32 {
1245 18
1246}
1247
1248#[cfg(test)]
1249mod disk_tests {
1250 use super::*;
1251 use std::sync::Arc;
1252 use tempfile::TempDir;
1253
1254 fn new_store(tmp: &TempDir) -> DiskS3Store {
1255 let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1256 DiskS3Store::new(tmp.path().to_path_buf(), cache)
1257 }
1258
1259 fn new_store_with_cache(
1260 tmp: &TempDir,
1261 cap: u64,
1262 ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1263 let cache = Arc::new(crate::cache::BodyCache::new(cap));
1264 (
1265 DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1266 cache,
1267 )
1268 }
1269
1270 fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1271 ObjectMeta {
1272 key: key.to_string(),
1273 content_type: "application/octet-stream".to_string(),
1274 etag: "etag".to_string(),
1275 size,
1276 ..Default::default()
1277 }
1278 }
1279
1280 #[test]
1281 fn put_bucket_meta_roundtrip() {
1282 let tmp = TempDir::new().unwrap();
1283 let store = new_store(&tmp);
1284 let meta = BucketMeta {
1285 name: "b1".to_string(),
1286 region: "us-east-1".to_string(),
1287 versioning: Some("Enabled".to_string()),
1288 ..Default::default()
1289 };
1290 store.put_bucket_meta("b1", &meta).unwrap();
1291 let loaded = store.load().unwrap();
1292 let snap = loaded.buckets.get("b1").unwrap();
1293 assert_eq!(snap.meta.name, "b1");
1294 assert_eq!(snap.meta.region, "us-east-1");
1295 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1296 }
1297
1298 #[test]
1299 fn put_bucket_subresource_each_variant_writes_file() {
1300 let tmp = TempDir::new().unwrap();
1301 let store = new_store(&tmp);
1302 store
1303 .put_bucket_meta(
1304 "b",
1305 &BucketMeta {
1306 name: "b".to_string(),
1307 ..Default::default()
1308 },
1309 )
1310 .unwrap();
1311 let variants = [
1312 BucketSubresource::Tags,
1313 BucketSubresource::Lifecycle,
1314 BucketSubresource::Cors,
1315 BucketSubresource::Policy,
1316 BucketSubresource::Notification,
1317 BucketSubresource::Logging,
1318 BucketSubresource::Website,
1319 BucketSubresource::PublicAccessBlock,
1320 BucketSubresource::ObjectLock,
1321 BucketSubresource::Replication,
1322 BucketSubresource::Ownership,
1323 BucketSubresource::Inventory,
1324 BucketSubresource::Encryption,
1325 BucketSubresource::Versioning,
1326 BucketSubresource::Acl,
1327 BucketSubresource::Accelerate,
1328 ];
1329 for v in variants {
1330 store
1331 .put_bucket_subresource("b", v, "payload=true")
1332 .unwrap();
1333 let file = store
1334 .bucket_dir("b")
1335 .join(DiskS3Store::subresource_filename(v));
1336 assert!(file.exists(), "{:?}", v);
1337 assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1338 }
1339 }
1340
1341 #[test]
1342 fn delete_bucket_subresource_removes_file() {
1343 let tmp = TempDir::new().unwrap();
1344 let store = new_store(&tmp);
1345 store
1346 .put_bucket_meta(
1347 "b",
1348 &BucketMeta {
1349 name: "b".to_string(),
1350 ..Default::default()
1351 },
1352 )
1353 .unwrap();
1354 store
1355 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1356 .unwrap();
1357 store
1358 .delete_bucket_subresource("b", BucketSubresource::Tags)
1359 .unwrap();
1360 let file = store.bucket_dir("b").join("tags.toml");
1361 assert!(!file.exists());
1362 store
1364 .delete_bucket_subresource("b", BucketSubresource::Tags)
1365 .unwrap();
1366 }
1367
1368 #[test]
1369 fn put_object_bytes_roundtrip() {
1370 let tmp = TempDir::new().unwrap();
1371 let store = new_store(&tmp);
1372 store
1373 .put_bucket_meta(
1374 "b",
1375 &BucketMeta {
1376 name: "b".to_string(),
1377 ..Default::default()
1378 },
1379 )
1380 .unwrap();
1381 let data = Bytes::from_static(b"hello world");
1382 let meta = sample_meta("k1", data.len() as u64);
1383 let body_ref = store
1384 .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1385 .unwrap();
1386 match &body_ref {
1387 BodyRef::Disk {
1388 bucket,
1389 key,
1390 size,
1391 path,
1392 ..
1393 } => {
1394 assert_eq!(bucket, "b");
1395 assert_eq!(key, "k1");
1396 assert_eq!(*size, data.len() as u64);
1397 assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1398 }
1399 _ => panic!("expected Disk"),
1400 }
1401 let loaded = store.load().unwrap();
1402 let snap = loaded.buckets.get("b").unwrap();
1403 let obj = snap.objects.get("k1").unwrap();
1404 assert_eq!(obj.meta.size, data.len() as u64);
1405 }
1406
1407 #[test]
1408 fn put_object_file_copy_source_leaves_src_in_place() {
1409 let tmp = TempDir::new().unwrap();
1410 let store = new_store(&tmp);
1411 store
1412 .put_bucket_meta(
1413 "b",
1414 &BucketMeta {
1415 name: "b".to_string(),
1416 ..Default::default()
1417 },
1418 )
1419 .unwrap();
1420 let src_dir = TempDir::new().unwrap();
1421 let src = src_dir.path().join("src.bin");
1422 std::fs::write(&src, b"copied-body").unwrap();
1423 let meta = sample_meta("k", 11);
1424 let bref = store
1425 .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1426 .unwrap();
1427 match bref {
1428 BodyRef::Disk { path, size, .. } => {
1429 assert_eq!(size, 11);
1430 assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1431 }
1432 _ => panic!("expected Disk bodyref"),
1433 }
1434 assert!(src.exists(), "source file must not be moved by FileCopy");
1435 assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1436 }
1437
1438 #[test]
1439 fn put_object_file_source() {
1440 let tmp = TempDir::new().unwrap();
1441 let store = new_store(&tmp);
1442 store
1443 .put_bucket_meta(
1444 "b",
1445 &BucketMeta {
1446 name: "b".to_string(),
1447 ..Default::default()
1448 },
1449 )
1450 .unwrap();
1451 let src = tmp.path().join("src.bin");
1452 std::fs::write(&src, b"file-body").unwrap();
1453 let meta = sample_meta("k", 9);
1454 let body_ref = store
1455 .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1456 .unwrap();
1457 let path = match body_ref {
1458 BodyRef::Disk { path, .. } => path,
1459 _ => panic!(),
1460 };
1461 assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1462 }
1463
1464 #[test]
1465 fn put_object_meta_only_keeps_bin() {
1466 let tmp = TempDir::new().unwrap();
1467 let store = new_store(&tmp);
1468 store
1469 .put_bucket_meta(
1470 "b",
1471 &BucketMeta {
1472 name: "b".to_string(),
1473 ..Default::default()
1474 },
1475 )
1476 .unwrap();
1477 let data = Bytes::from_static(b"abc");
1478 let mut meta = sample_meta("k", 3);
1479 store
1480 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1481 .unwrap();
1482 let (_, bin, _) = store.object_paths("b", "k", None);
1483 let before = std::fs::read(&bin).unwrap();
1484 meta.tags.insert("x".to_string(), "y".to_string());
1485 store.put_object_meta("b", "k", None, &meta).unwrap();
1486 assert_eq!(std::fs::read(&bin).unwrap(), before);
1487 let loaded = store.load().unwrap();
1488 let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1489 assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1490 }
1491
1492 #[test]
1493 fn delete_object_cleans_up_files_and_cache() {
1494 let tmp = TempDir::new().unwrap();
1495 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1496 store
1497 .put_bucket_meta(
1498 "b",
1499 &BucketMeta {
1500 name: "b".to_string(),
1501 ..Default::default()
1502 },
1503 )
1504 .unwrap();
1505 let data = Bytes::from_static(b"bye");
1506 let meta = sample_meta("k", 3);
1507 store
1508 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1509 .unwrap();
1510 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1511 assert!(cache.get(&body_key).is_some());
1512 store.delete_object("b", "k", None).unwrap();
1513 let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1514 assert!(!bin.exists());
1515 assert!(!toml_path.exists());
1516 assert!(!dir.exists());
1517 assert!(cache.get(&body_key).is_none());
1518 }
1519
1520 #[test]
1521 fn open_object_body_cache_hit_and_refill() {
1522 let tmp = TempDir::new().unwrap();
1523 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1524 store
1525 .put_bucket_meta(
1526 "b",
1527 &BucketMeta {
1528 name: "b".to_string(),
1529 ..Default::default()
1530 },
1531 )
1532 .unwrap();
1533 let data = Bytes::from_static(b"payload");
1534 let meta = sample_meta("k", data.len() as u64);
1535 let body_ref = store
1536 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1537 .unwrap();
1538 let got = store.open_object_body(&body_ref).unwrap();
1540 assert_eq!(got, data);
1541 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1543 cache.invalidate(&body_key);
1544 assert!(cache.get(&body_key).is_none());
1545 let got = store.open_object_body(&body_ref).unwrap();
1546 assert_eq!(got, data);
1547 assert!(cache.get(&body_key).is_some());
1548 }
1549
1550 #[test]
1551 fn open_object_body_large_bypasses_cache() {
1552 let tmp = TempDir::new().unwrap();
1553 let (store, cache) = new_store_with_cache(&tmp, 1024);
1555 store
1556 .put_bucket_meta(
1557 "b",
1558 &BucketMeta {
1559 name: "b".to_string(),
1560 ..Default::default()
1561 },
1562 )
1563 .unwrap();
1564 let data = Bytes::from(vec![7u8; 800]);
1565 let meta = sample_meta("big", 800);
1566 let body_ref = store
1567 .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1568 .unwrap();
1569 let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1570 assert!(cache.get(&body_key).is_none());
1571 let got = store.open_object_body(&body_ref).unwrap();
1572 assert_eq!(got, data);
1573 assert!(cache.get(&body_key).is_none());
1575 }
1576
1577 #[test]
1578 fn load_empty_dir() {
1579 let tmp = TempDir::new().unwrap();
1580 let store = new_store(&tmp);
1581 let s = store.load().unwrap();
1582 assert!(s.buckets.is_empty());
1583 }
1584
1585 #[test]
1586 fn load_skips_mpu_without_init() {
1587 let tmp = TempDir::new().unwrap();
1588 let store = new_store(&tmp);
1589 store
1590 .put_bucket_meta(
1591 "b",
1592 &BucketMeta {
1593 name: "b".to_string(),
1594 ..Default::default()
1595 },
1596 )
1597 .unwrap();
1598 let data = Bytes::from_static(b"x");
1599 let meta = sample_meta("k", 1);
1600 store
1601 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1602 .unwrap();
1603 let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1605 std::fs::create_dir_all(&mpu).unwrap();
1606
1607 let loaded = store.load().unwrap();
1608 let snap = loaded.buckets.get("b").unwrap();
1609 assert_eq!(snap.objects.len(), 1);
1610 assert!(snap.multipart_uploads.is_empty());
1611 }
1612
1613 #[test]
1614 fn load_reads_bucket_subresources() {
1615 let tmp = TempDir::new().unwrap();
1616 let store = new_store(&tmp);
1617 store
1618 .put_bucket_meta(
1619 "b",
1620 &BucketMeta {
1621 name: "b".to_string(),
1622 ..Default::default()
1623 },
1624 )
1625 .unwrap();
1626 store
1627 .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1628 .unwrap();
1629 store
1630 .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1631 .unwrap();
1632 store
1633 .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1634 .unwrap();
1635 store
1636 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1637 .unwrap();
1638 let loaded = store.load().unwrap();
1639 let snap = loaded.buckets.get("b").unwrap();
1640 assert_eq!(
1641 snap.subresources.get("lifecycle.toml").map(String::as_str),
1642 Some("<Lifecycle/>"),
1643 );
1644 assert_eq!(
1645 snap.subresources.get("cors.toml").map(String::as_str),
1646 Some("<Cors/>"),
1647 );
1648 assert_eq!(
1649 snap.subresources.get("policy.toml").map(String::as_str),
1650 Some("{}"),
1651 );
1652 assert_eq!(
1653 snap.subresources.get("tags.toml").map(String::as_str),
1654 Some("x=1"),
1655 );
1656 }
1657
1658 #[test]
1659 fn versioned_put_load_roundtrip() {
1660 let tmp = TempDir::new().unwrap();
1661 let store = new_store(&tmp);
1662 store
1663 .put_bucket_meta(
1664 "b",
1665 &BucketMeta {
1666 name: "b".to_string(),
1667 versioning: Some("Enabled".to_string()),
1668 ..Default::default()
1669 },
1670 )
1671 .unwrap();
1672 let base = chrono::Utc::now();
1673 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1674 .iter()
1675 .enumerate()
1676 {
1677 let mut m = sample_meta("k", body.len() as u64);
1678 m.version_id = Some((*vid).to_string());
1679 m.last_modified = base + chrono::Duration::seconds(i as i64);
1680 store
1681 .put_object(
1682 "b",
1683 "k",
1684 Some(*vid),
1685 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1686 &m,
1687 )
1688 .unwrap();
1689 }
1690 let loaded = store.load().unwrap();
1691 let snap = loaded.buckets.get("b").unwrap();
1692 let versions = snap.object_versions.get("k").expect("versions present");
1693 assert_eq!(versions.len(), 3);
1694 assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1695 assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1696 assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1697 for v in versions {
1698 match &v.body {
1699 BodyRef::Disk { size, .. } => assert!(*size > 0),
1700 _ => panic!("expected Disk body"),
1701 }
1702 }
1703 }
1704
1705 #[test]
1706 fn versioned_load_promotes_latest_live_to_snap_objects() {
1707 let tmp = TempDir::new().unwrap();
1708 let store = new_store(&tmp);
1709 store
1710 .put_bucket_meta(
1711 "b",
1712 &BucketMeta {
1713 name: "b".to_string(),
1714 versioning: Some("Enabled".to_string()),
1715 ..Default::default()
1716 },
1717 )
1718 .unwrap();
1719 let base = chrono::Utc::now();
1720 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1721 .iter()
1722 .enumerate()
1723 {
1724 let mut m = sample_meta("k", body.len() as u64);
1725 m.version_id = Some((*vid).to_string());
1726 m.last_modified = base + chrono::Duration::seconds(i as i64);
1727 store
1728 .put_object(
1729 "b",
1730 "k",
1731 Some(*vid),
1732 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1733 &m,
1734 )
1735 .unwrap();
1736 }
1737 let loaded = store.load().unwrap();
1738 let snap = loaded.buckets.get("b").unwrap();
1739 let current = snap.objects.get("k").expect("current object promoted");
1740 assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1741 }
1742
1743 #[test]
1744 fn versioned_load_trailing_delete_marker_hides_current() {
1745 let tmp = TempDir::new().unwrap();
1746 let store = new_store(&tmp);
1747 store
1748 .put_bucket_meta(
1749 "b",
1750 &BucketMeta {
1751 name: "b".to_string(),
1752 versioning: Some("Enabled".to_string()),
1753 ..Default::default()
1754 },
1755 )
1756 .unwrap();
1757 let base = chrono::Utc::now();
1758 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1759 .iter()
1760 .enumerate()
1761 {
1762 let mut m = sample_meta("k", body.len() as u64);
1763 m.version_id = Some((*vid).to_string());
1764 m.last_modified = base + chrono::Duration::seconds(i as i64);
1765 store
1766 .put_object(
1767 "b",
1768 "k",
1769 Some(*vid),
1770 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1771 &m,
1772 )
1773 .unwrap();
1774 }
1775 let mut dm = sample_meta("k", 0);
1777 dm.version_id = Some("dm1".to_string());
1778 dm.is_delete_marker = true;
1779 dm.last_modified = base + chrono::Duration::seconds(10);
1780 store
1781 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1782 .unwrap();
1783 let loaded = store.load().unwrap();
1784 let snap = loaded.buckets.get("b").unwrap();
1785 assert!(
1786 !snap.objects.contains_key("k"),
1787 "trailing delete marker must hide current object",
1788 );
1789 assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1790 }
1791
1792 #[test]
1793 fn legacy_null_object_overridden_by_newer_versions() {
1794 let tmp = TempDir::new().unwrap();
1797 let store = new_store(&tmp);
1798 store
1799 .put_bucket_meta(
1800 "b",
1801 &BucketMeta {
1802 name: "b".to_string(),
1803 ..Default::default()
1804 },
1805 )
1806 .unwrap();
1807 let base = chrono::Utc::now();
1808 let mut null_meta = sample_meta("k", 3);
1809 null_meta.last_modified = base;
1810 store
1811 .put_object(
1812 "b",
1813 "k",
1814 None,
1815 BodySource::Bytes(Bytes::from_static(b"old")),
1816 &null_meta,
1817 )
1818 .unwrap();
1819 store
1821 .put_bucket_meta(
1822 "b",
1823 &BucketMeta {
1824 name: "b".to_string(),
1825 versioning: Some("Enabled".to_string()),
1826 ..Default::default()
1827 },
1828 )
1829 .unwrap();
1830 let mut v1 = sample_meta("k", 3);
1831 v1.version_id = Some("v1".to_string());
1832 v1.last_modified = base + chrono::Duration::seconds(1);
1833 store
1834 .put_object(
1835 "b",
1836 "k",
1837 Some("v1"),
1838 BodySource::Bytes(Bytes::from_static(b"new")),
1839 &v1,
1840 )
1841 .unwrap();
1842 let mut v2 = sample_meta("k", 5);
1843 v2.version_id = Some("v2".to_string());
1844 v2.last_modified = base + chrono::Duration::seconds(2);
1845 store
1846 .put_object(
1847 "b",
1848 "k",
1849 Some("v2"),
1850 BodySource::Bytes(Bytes::from_static(b"newer")),
1851 &v2,
1852 )
1853 .unwrap();
1854 let loaded = store.load().unwrap();
1855 let snap = loaded.buckets.get("b").unwrap();
1856 let current = snap
1857 .objects
1858 .get("k")
1859 .expect("latest live version must override stale null");
1860 assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1861 assert_eq!(current.meta.size, 5);
1862 }
1863
1864 #[test]
1865 fn legacy_null_hidden_by_trailing_delete_marker() {
1866 let tmp = TempDir::new().unwrap();
1867 let store = new_store(&tmp);
1868 store
1869 .put_bucket_meta(
1870 "b",
1871 &BucketMeta {
1872 name: "b".to_string(),
1873 ..Default::default()
1874 },
1875 )
1876 .unwrap();
1877 let base = chrono::Utc::now();
1878 let mut null_meta = sample_meta("k", 3);
1879 null_meta.last_modified = base;
1880 store
1881 .put_object(
1882 "b",
1883 "k",
1884 None,
1885 BodySource::Bytes(Bytes::from_static(b"old")),
1886 &null_meta,
1887 )
1888 .unwrap();
1889 store
1890 .put_bucket_meta(
1891 "b",
1892 &BucketMeta {
1893 name: "b".to_string(),
1894 versioning: Some("Enabled".to_string()),
1895 ..Default::default()
1896 },
1897 )
1898 .unwrap();
1899 let mut v1 = sample_meta("k", 3);
1900 v1.version_id = Some("v1".to_string());
1901 v1.last_modified = base + chrono::Duration::seconds(1);
1902 store
1903 .put_object(
1904 "b",
1905 "k",
1906 Some("v1"),
1907 BodySource::Bytes(Bytes::from_static(b"new")),
1908 &v1,
1909 )
1910 .unwrap();
1911 let mut dm = sample_meta("k", 0);
1912 dm.version_id = Some("dm1".to_string());
1913 dm.is_delete_marker = true;
1914 dm.last_modified = base + chrono::Duration::seconds(2);
1915 store
1916 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1917 .unwrap();
1918 let loaded = store.load().unwrap();
1919 let snap = loaded.buckets.get("b").unwrap();
1920 assert!(
1921 !snap.objects.contains_key("k"),
1922 "trailing delete marker must hide even a legacy null object",
1923 );
1924 }
1925
1926 #[test]
1927 fn delete_marker_roundtrip_no_body_file() {
1928 let tmp = TempDir::new().unwrap();
1929 let store = new_store(&tmp);
1930 store
1931 .put_bucket_meta(
1932 "b",
1933 &BucketMeta {
1934 name: "b".to_string(),
1935 versioning: Some("Enabled".to_string()),
1936 ..Default::default()
1937 },
1938 )
1939 .unwrap();
1940 let mut m = sample_meta("k", 0);
1941 m.version_id = Some("dm1".to_string());
1942 m.is_delete_marker = true;
1943 store
1944 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1945 .unwrap();
1946 let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1948 assert!(!bin.exists(), "delete marker must not have a .bin file");
1949 assert!(toml_path.exists());
1950
1951 let loaded = store.load().unwrap();
1952 let versions = loaded
1953 .buckets
1954 .get("b")
1955 .unwrap()
1956 .object_versions
1957 .get("k")
1958 .unwrap();
1959 assert_eq!(versions.len(), 1);
1960 assert!(versions[0].meta.is_delete_marker);
1961 match &versions[0].body {
1962 BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1963 _ => panic!("delete marker body should be empty Memory"),
1964 }
1965 }
1966
1967 #[test]
1968 fn mixed_nonversioned_and_versioned_buckets() {
1969 let tmp = TempDir::new().unwrap();
1970 let store = new_store(&tmp);
1971 store
1972 .put_bucket_meta(
1973 "a",
1974 &BucketMeta {
1975 name: "a".to_string(),
1976 ..Default::default()
1977 },
1978 )
1979 .unwrap();
1980 store
1981 .put_bucket_meta(
1982 "b",
1983 &BucketMeta {
1984 name: "b".to_string(),
1985 versioning: Some("Enabled".to_string()),
1986 ..Default::default()
1987 },
1988 )
1989 .unwrap();
1990 let ma = sample_meta("only", 3);
1991 store
1992 .put_object(
1993 "a",
1994 "only",
1995 None,
1996 BodySource::Bytes(Bytes::from_static(b"aaa")),
1997 &ma,
1998 )
1999 .unwrap();
2000 let base = chrono::Utc::now();
2001 for (i, vid) in ["v1", "v2"].iter().enumerate() {
2002 let mut m = sample_meta("twice", 2);
2003 m.version_id = Some((*vid).to_string());
2004 m.last_modified = base + chrono::Duration::seconds(i as i64);
2005 store
2006 .put_object(
2007 "b",
2008 "twice",
2009 Some(*vid),
2010 BodySource::Bytes(Bytes::from_static(b"xx")),
2011 &m,
2012 )
2013 .unwrap();
2014 }
2015 let loaded = store.load().unwrap();
2016 assert_eq!(loaded.buckets.len(), 2);
2017 let a = loaded.buckets.get("a").unwrap();
2018 assert_eq!(a.objects.len(), 1);
2019 assert!(a.object_versions.is_empty());
2020 let b = loaded.buckets.get("b").unwrap();
2021 assert_eq!(
2024 b.objects.get("twice").unwrap().meta.version_id.as_deref(),
2025 Some("v2")
2026 );
2027 assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
2028 }
2029
2030 fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
2031 MpuInit {
2032 upload_id: upload_id.to_string(),
2033 key: key.to_string(),
2034 initiated: chrono::Utc::now(),
2035 content_type: "application/octet-stream".to_string(),
2036 storage_class: "STANDARD".to_string(),
2037 ..Default::default()
2038 }
2039 }
2040
2041 #[test]
2042 fn mpu_create_then_load_empty_parts() {
2043 let tmp = TempDir::new().unwrap();
2044 let store = new_store(&tmp);
2045 store
2046 .put_bucket_meta(
2047 "b",
2048 &BucketMeta {
2049 name: "b".to_string(),
2050 ..Default::default()
2051 },
2052 )
2053 .unwrap();
2054 let init = sample_mpu_init("up1", "k1");
2055 store.mpu_create("b", "up1", &init).unwrap();
2056 let loaded = store.load().unwrap();
2057 let snap = loaded.buckets.get("b").unwrap();
2058 let m = snap.multipart_uploads.get("up1").expect("upload present");
2059 assert_eq!(m.init.upload_id, "up1");
2060 assert_eq!(m.init.key, "k1");
2061 assert!(m.parts.is_empty());
2062 }
2063
2064 #[test]
2065 fn mpu_put_part_then_load_three_parts() {
2066 let tmp = TempDir::new().unwrap();
2067 let store = new_store(&tmp);
2068 store
2069 .put_bucket_meta(
2070 "b",
2071 &BucketMeta {
2072 name: "b".to_string(),
2073 ..Default::default()
2074 },
2075 )
2076 .unwrap();
2077 store
2078 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2079 .unwrap();
2080 let bodies = [
2081 Bytes::from_static(b"part-one"),
2082 Bytes::from_static(b"part-two-longer"),
2083 Bytes::from_static(b"p3"),
2084 ];
2085 for (i, body) in bodies.iter().enumerate() {
2086 let n = (i + 1) as u32;
2087 store
2088 .mpu_put_part(
2089 "b",
2090 "up",
2091 n,
2092 BodySource::Bytes(body.clone()),
2093 &format!("et{}", n),
2094 )
2095 .unwrap();
2096 }
2097 let loaded = store.load().unwrap();
2098 let snap = loaded.buckets.get("b").unwrap();
2099 let m = snap.multipart_uploads.get("up").unwrap();
2100 assert_eq!(m.parts.len(), 3);
2101 for (i, body) in bodies.iter().enumerate() {
2102 let n = (i + 1) as u32;
2103 let part = m.parts.get(&n).unwrap();
2104 assert_eq!(part.meta.part_number, n);
2105 assert_eq!(part.meta.size, body.len() as u64);
2106 assert_eq!(part.meta.etag, format!("et{}", n));
2107 match &part.body {
2108 BodyRef::Disk { path, size, .. } => {
2109 assert_eq!(*size, body.len() as u64);
2110 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2111 }
2112 _ => panic!("expected Disk"),
2113 }
2114 }
2115 }
2116
2117 #[test]
2118 fn mpu_abort_removes_upload() {
2119 let tmp = TempDir::new().unwrap();
2120 let store = new_store(&tmp);
2121 store
2122 .put_bucket_meta(
2123 "b",
2124 &BucketMeta {
2125 name: "b".to_string(),
2126 ..Default::default()
2127 },
2128 )
2129 .unwrap();
2130 store
2131 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2132 .unwrap();
2133 store
2134 .mpu_put_part(
2135 "b",
2136 "up",
2137 1,
2138 BodySource::Bytes(Bytes::from_static(b"x")),
2139 "e",
2140 )
2141 .unwrap();
2142 store.mpu_abort("b", "up").unwrap();
2143 assert!(!store.mpu_dir("b", "up").exists());
2144 store.mpu_abort("b", "up").unwrap();
2146 let loaded = store.load().unwrap();
2147 let snap = loaded.buckets.get("b").unwrap();
2148 assert!(snap.multipart_uploads.is_empty());
2149 }
2150
2151 #[test]
2152 fn mpu_complete_single_part_fast_path() {
2153 let tmp = TempDir::new().unwrap();
2154 let store = new_store(&tmp);
2155 store
2156 .put_bucket_meta(
2157 "b",
2158 &BucketMeta {
2159 name: "b".to_string(),
2160 ..Default::default()
2161 },
2162 )
2163 .unwrap();
2164 store
2165 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2166 .unwrap();
2167 let body = Bytes::from_static(b"only-part-bytes");
2168 store
2169 .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2170 .unwrap();
2171 let meta = sample_meta("k", body.len() as u64);
2172 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2173 match &body_ref {
2174 BodyRef::Disk { path, size, .. } => {
2175 assert_eq!(*size, body.len() as u64);
2176 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2177 }
2178 _ => panic!("expected Disk"),
2179 }
2180 assert!(!store.mpu_dir("b", "up").exists());
2181 }
2182
2183 #[test]
2184 fn mpu_complete_multi_part_concat() {
2185 let tmp = TempDir::new().unwrap();
2186 let store = new_store(&tmp);
2187 store
2188 .put_bucket_meta(
2189 "b",
2190 &BucketMeta {
2191 name: "b".to_string(),
2192 ..Default::default()
2193 },
2194 )
2195 .unwrap();
2196 store
2197 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2198 .unwrap();
2199 let p1 = Bytes::from_static(b"AAAA");
2200 let p2 = Bytes::from_static(b"BBBBBB");
2201 let p3 = Bytes::from_static(b"CC");
2202 for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2203 store
2204 .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2205 .unwrap();
2206 }
2207 let mut expected = Vec::new();
2208 expected.extend_from_slice(&p1);
2209 expected.extend_from_slice(&p2);
2210 expected.extend_from_slice(&p3);
2211 let meta = sample_meta("k", expected.len() as u64);
2212 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2213 let path = match body_ref {
2214 BodyRef::Disk { path, size, .. } => {
2215 assert_eq!(size, expected.len() as u64);
2216 path
2217 }
2218 _ => panic!("expected Disk"),
2219 };
2220 assert_eq!(std::fs::read(&path).unwrap(), expected);
2221 assert!(!store.mpu_dir("b", "up").exists());
2222 }
2223
2224 #[test]
2225 fn mpu_complete_large_streaming_via_file_source() {
2226 let tmp = TempDir::new().unwrap();
2227 let store = new_store(&tmp);
2228 store
2229 .put_bucket_meta(
2230 "b",
2231 &BucketMeta {
2232 name: "b".to_string(),
2233 ..Default::default()
2234 },
2235 )
2236 .unwrap();
2237 store
2238 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2239 .unwrap();
2240
2241 const PART_SIZE: usize = 1024 * 1024;
2244 let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2245 let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2246 for (i, byte) in patterns.iter().enumerate() {
2247 let src = tmp.path().join(format!("src-{}.bin", i + 1));
2248 let data = vec![*byte; PART_SIZE];
2249 std::fs::write(&src, &data).unwrap();
2250 expected.extend_from_slice(&data);
2251 store
2252 .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2253 .unwrap();
2254 }
2255 let meta = sample_meta("k", expected.len() as u64);
2256 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2257 let path = match body_ref {
2258 BodyRef::Disk { path, size, .. } => {
2259 assert_eq!(size, expected.len() as u64);
2260 path
2261 }
2262 _ => panic!("expected Disk"),
2263 };
2264 let actual = std::fs::read(&path).unwrap();
2265 assert_eq!(actual.len(), expected.len());
2266 assert_eq!(actual, expected);
2267 assert!(!store.mpu_dir("b", "up").exists());
2268 }
2269
2270 #[test]
2271 fn mpu_resumable_across_load() {
2272 let tmp = TempDir::new().unwrap();
2273 let store = new_store(&tmp);
2274 store
2275 .put_bucket_meta(
2276 "b",
2277 &BucketMeta {
2278 name: "b".to_string(),
2279 ..Default::default()
2280 },
2281 )
2282 .unwrap();
2283 store
2284 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2285 .unwrap();
2286 let p1 = Bytes::from_static(b"hello-");
2287 let p2 = Bytes::from_static(b"world-");
2288 store
2289 .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2290 .unwrap();
2291 store
2292 .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2293 .unwrap();
2294
2295 let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2297 let loaded = store2.load().unwrap();
2298 let snap = loaded.buckets.get("b").unwrap();
2299 let m = snap.multipart_uploads.get("up").unwrap();
2300 assert_eq!(m.parts.len(), 2);
2301 assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2302 assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2303
2304 let p3 = Bytes::from_static(b"again!");
2306 store2
2307 .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2308 .unwrap();
2309 let mut expected = Vec::new();
2310 expected.extend_from_slice(&p1);
2311 expected.extend_from_slice(&p2);
2312 expected.extend_from_slice(&p3);
2313 let meta = sample_meta("k", expected.len() as u64);
2314 let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2315 let path = match body_ref {
2316 BodyRef::Disk { path, .. } => path,
2317 _ => panic!(),
2318 };
2319 assert_eq!(std::fs::read(&path).unwrap(), expected);
2320 assert!(!store2.mpu_dir("b", "up").exists());
2321 }
2322
2323 #[test]
2324 fn tags_snapshot_roundtrip_via_store() {
2325 let tmp = TempDir::new().unwrap();
2326 let store = new_store(&tmp);
2327 store
2328 .put_bucket_meta(
2329 "b",
2330 &BucketMeta {
2331 name: "b".to_string(),
2332 ..Default::default()
2333 },
2334 )
2335 .unwrap();
2336 let mut tags = BTreeMap::new();
2337 tags.insert("env".to_string(), "prod".to_string());
2338 tags.insert("team".to_string(), "s3".to_string());
2339 let snap = TagsSnapshot { tags: tags.clone() };
2340 let payload = toml::to_string(&snap).unwrap();
2341 store
2342 .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2343 .unwrap();
2344 let loaded = store.load().unwrap();
2345 let text = loaded
2346 .buckets
2347 .get("b")
2348 .unwrap()
2349 .subresources
2350 .get("tags.toml")
2351 .cloned()
2352 .unwrap();
2353 let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2354 assert_eq!(decoded.tags, tags);
2355 }
2356
2357 #[test]
2358 fn acl_snapshot_roundtrip_via_store() {
2359 let tmp = TempDir::new().unwrap();
2360 let store = new_store(&tmp);
2361 store
2362 .put_bucket_meta(
2363 "b",
2364 &BucketMeta {
2365 name: "b".to_string(),
2366 ..Default::default()
2367 },
2368 )
2369 .unwrap();
2370 let snap = AclSnapshot {
2371 owner_id: "owner-abc".to_string(),
2372 grants: vec![
2373 AclGrantSnapshot {
2374 grantee_type: "CanonicalUser".to_string(),
2375 grantee_id: Some("owner-abc".to_string()),
2376 grantee_display_name: Some("owner".to_string()),
2377 grantee_uri: None,
2378 permission: "FULL_CONTROL".to_string(),
2379 },
2380 AclGrantSnapshot {
2381 grantee_type: "Group".to_string(),
2382 grantee_id: None,
2383 grantee_display_name: None,
2384 grantee_uri: Some(
2385 "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2386 ),
2387 permission: "READ".to_string(),
2388 },
2389 ],
2390 };
2391 let payload = toml::to_string(&snap).unwrap();
2392 store
2393 .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2394 .unwrap();
2395 let loaded = store.load().unwrap();
2396 let text = loaded
2397 .buckets
2398 .get("b")
2399 .unwrap()
2400 .subresources
2401 .get("acl.toml")
2402 .cloned()
2403 .unwrap();
2404 let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2405 assert_eq!(decoded.owner_id, "owner-abc");
2406 assert_eq!(decoded.grants.len(), 2);
2407 assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2408 assert_eq!(decoded.grants[1].grantee_type, "Group");
2409 }
2410
2411 #[test]
2412 fn inventory_snapshot_roundtrip_via_store() {
2413 let tmp = TempDir::new().unwrap();
2414 let store = new_store(&tmp);
2415 store
2416 .put_bucket_meta(
2417 "b",
2418 &BucketMeta {
2419 name: "b".to_string(),
2420 ..Default::default()
2421 },
2422 )
2423 .unwrap();
2424 let mut configs = BTreeMap::new();
2425 configs.insert(
2426 "inv-1".to_string(),
2427 "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2428 );
2429 configs.insert(
2430 "inv-2".to_string(),
2431 "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2432 );
2433 let snap = InventorySnapshot {
2434 configs: configs.clone(),
2435 };
2436 let payload = toml::to_string(&snap).unwrap();
2437 store
2438 .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2439 .unwrap();
2440 let loaded = store.load().unwrap();
2441 let text = loaded
2442 .buckets
2443 .get("b")
2444 .unwrap()
2445 .subresources
2446 .get("inventory.toml")
2447 .cloned()
2448 .unwrap();
2449 let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2450 assert_eq!(decoded.configs, configs);
2451 }
2452
2453 #[test]
2454 fn legacy_versioning_file_is_read() {
2455 let tmp = TempDir::new().unwrap();
2456 let store = new_store(&tmp);
2457 store
2459 .put_bucket_meta(
2460 "b",
2461 &BucketMeta {
2462 name: "b".to_string(),
2463 ..Default::default()
2464 },
2465 )
2466 .unwrap();
2467 let path = store.bucket_dir("b").join("versioning.toml");
2469 std::fs::write(&path, "Enabled").unwrap();
2470 let loaded = store.load().unwrap();
2471 let snap = loaded.buckets.get("b").unwrap();
2472 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2473 }
2474
2475 #[test]
2476 fn load_skips_corrupt_bucket_and_keeps_others() {
2477 let tmp = TempDir::new().unwrap();
2480 let store = new_store(&tmp);
2481 store
2482 .put_bucket_meta(
2483 "good",
2484 &BucketMeta {
2485 name: "good".to_string(),
2486 ..Default::default()
2487 },
2488 )
2489 .unwrap();
2490 store
2491 .put_object(
2492 "good",
2493 "k",
2494 None,
2495 BodySource::Bytes(Bytes::from_static(b"hi")),
2496 &sample_meta("k", 2),
2497 )
2498 .unwrap();
2499
2500 let bad_dir = store.buckets_dir().join("bad");
2501 std::fs::create_dir_all(&bad_dir).unwrap();
2502 std::fs::write(bad_dir.join("meta.toml"), b"not valid toml = = =").unwrap();
2503
2504 let loaded = store.load().unwrap();
2505 assert!(
2506 loaded.buckets.contains_key("good"),
2507 "valid bucket must load"
2508 );
2509 assert!(
2510 !loaded.buckets.contains_key("bad"),
2511 "corrupt bucket must be skipped, not abort the load"
2512 );
2513 }
2514}