1use std::collections::{BTreeMap, HashMap};
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11 #[serde(default)]
12 pub account_id: String,
13 #[serde(default)]
14 pub region: String,
15 #[serde(default)]
16 pub buckets: HashMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21 pub meta: BucketMeta,
22 #[serde(default)]
23 pub objects: HashMap<String, LoadedObject>,
24 #[serde(default)]
25 pub object_versions: HashMap<String, Vec<LoadedObject>>,
26 #[serde(default)]
27 pub subresources: HashMap<String, String>,
28 #[serde(default)]
29 pub multipart_uploads: HashMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34 pub init: MpuInit,
35 #[serde(default)]
36 pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41 pub meta: UploadPartMeta,
42 #[serde(default)]
43 pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48 pub meta: ObjectMeta,
49 #[serde(default)]
50 pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55 #[serde(default)]
56 pub name: String,
57 #[serde(default = "default_time")]
58 pub creation_date: DateTime<Utc>,
59 #[serde(default)]
60 pub region: String,
61 #[serde(default)]
62 pub versioning: Option<String>,
63 #[serde(default)]
64 pub acl: Option<String>,
65 #[serde(default)]
66 pub acl_owner_id: String,
67 #[serde(default)]
68 pub accelerate_status: Option<String>,
69 #[serde(default)]
70 pub eventbridge_enabled: bool,
71}
72
73fn default_time() -> DateTime<Utc> {
74 DateTime::<Utc>::MIN_UTC
75}
76
77#[derive(Debug, Default, Clone, Serialize, Deserialize)]
78pub struct AclGrantSnapshot {
79 pub grantee_type: String,
80 #[serde(default)]
81 pub grantee_id: Option<String>,
82 #[serde(default)]
83 pub grantee_display_name: Option<String>,
84 #[serde(default)]
85 pub grantee_uri: Option<String>,
86 pub permission: String,
87}
88
89#[derive(Debug, Default, Clone, Serialize, Deserialize)]
90pub struct TagsSnapshot {
91 #[serde(default)]
92 pub tags: HashMap<String, String>,
93}
94
95#[derive(Debug, Default, Clone, Serialize, Deserialize)]
96pub struct AclSnapshot {
97 #[serde(default)]
98 pub owner_id: String,
99 #[serde(default)]
100 pub grants: Vec<AclGrantSnapshot>,
101}
102
103#[derive(Debug, Default, Clone, Serialize, Deserialize)]
104pub struct InventorySnapshot {
105 #[serde(default)]
106 pub configs: HashMap<String, String>,
107}
108
109#[derive(Debug, Default, Clone, Serialize, Deserialize)]
110pub struct ObjectMeta {
111 #[serde(default)]
112 pub key: String,
113 #[serde(default)]
114 pub content_type: String,
115 #[serde(default)]
116 pub etag: String,
117 #[serde(default)]
118 pub size: u64,
119 #[serde(default = "default_time")]
120 pub last_modified: DateTime<Utc>,
121 #[serde(default)]
122 pub metadata: HashMap<String, String>,
123 #[serde(default)]
124 pub tags: HashMap<String, String>,
125 #[serde(default)]
126 pub storage_class: String,
127 #[serde(default)]
128 pub acl_grants: Vec<AclGrantSnapshot>,
129 #[serde(default)]
130 pub acl_owner_id: Option<String>,
131 #[serde(default)]
132 pub parts_count: Option<u32>,
133 #[serde(default)]
134 pub part_sizes: Option<Vec<(u32, u64)>>,
135 #[serde(default)]
136 pub sse_algorithm: Option<String>,
137 #[serde(default)]
138 pub sse_kms_key_id: Option<String>,
139 #[serde(default)]
140 pub bucket_key_enabled: Option<bool>,
141 #[serde(default)]
142 pub version_id: Option<String>,
143 #[serde(default)]
144 pub is_delete_marker: bool,
145 #[serde(default)]
146 pub restore_ongoing: Option<bool>,
147 #[serde(default)]
148 pub restore_expiry: Option<String>,
149 #[serde(default)]
150 pub checksum_algorithm: Option<String>,
151 #[serde(default)]
152 pub checksum_value: Option<String>,
153 #[serde(default)]
154 pub lock_mode: Option<String>,
155 #[serde(default)]
156 pub lock_retain_until: Option<DateTime<Utc>>,
157 #[serde(default)]
158 pub lock_legal_hold: Option<String>,
159 #[serde(default)]
160 pub content_encoding: Option<String>,
161 #[serde(default)]
162 pub website_redirect_location: Option<String>,
163}
164
165#[derive(Debug, Default, Clone, Serialize, Deserialize)]
166pub struct MpuInit {
167 pub upload_id: String,
168 pub key: String,
169 #[serde(default = "default_time")]
170 pub initiated: DateTime<Utc>,
171 #[serde(default)]
172 pub metadata: HashMap<String, String>,
173 #[serde(default)]
174 pub content_type: String,
175 #[serde(default)]
176 pub storage_class: String,
177 #[serde(default)]
178 pub sse_algorithm: Option<String>,
179 #[serde(default)]
180 pub sse_kms_key_id: Option<String>,
181 #[serde(default)]
182 pub tagging: Option<String>,
183 #[serde(default)]
184 pub acl_grants: Vec<AclGrantSnapshot>,
185 #[serde(default)]
186 pub checksum_algorithm: Option<String>,
187}
188
189#[derive(Debug, Default, Clone, Serialize, Deserialize)]
190pub struct UploadPartMeta {
191 pub part_number: u32,
192 pub etag: String,
193 pub size: u64,
194 #[serde(default = "default_time")]
195 pub last_modified: DateTime<Utc>,
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub enum BucketSubresource {
200 Tags,
201 Lifecycle,
202 Cors,
203 Policy,
204 Notification,
205 Logging,
206 Website,
207 PublicAccessBlock,
208 ObjectLock,
209 Replication,
210 Ownership,
211 Inventory,
212 Encryption,
213 Versioning,
214 Acl,
215 Accelerate,
216}
217
218pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
219 BucketSubresource::Tags,
220 BucketSubresource::Lifecycle,
221 BucketSubresource::Cors,
222 BucketSubresource::Policy,
223 BucketSubresource::Notification,
224 BucketSubresource::Logging,
225 BucketSubresource::Website,
226 BucketSubresource::PublicAccessBlock,
227 BucketSubresource::ObjectLock,
228 BucketSubresource::Replication,
229 BucketSubresource::Ownership,
230 BucketSubresource::Inventory,
231 BucketSubresource::Encryption,
232 BucketSubresource::Versioning,
233 BucketSubresource::Acl,
234 BucketSubresource::Accelerate,
235];
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub enum BodyRef {
239 #[serde(skip)]
240 Memory(Bytes),
241 Disk {
242 bucket: String,
243 key: String,
244 #[serde(default)]
245 version: Option<String>,
246 path: PathBuf,
247 size: u64,
248 },
249}
250
251impl BodyRef {
252 pub fn size(&self) -> u64 {
253 match self {
254 BodyRef::Memory(b) => b.len() as u64,
255 BodyRef::Disk { size, .. } => *size,
256 }
257 }
258}
259
260impl Default for BodyRef {
261 fn default() -> Self {
262 BodyRef::Memory(Bytes::new())
263 }
264}
265
266#[derive(Debug)]
267pub enum BodySource {
268 Bytes(Bytes),
269 File(PathBuf),
272 FileCopy(PathBuf),
276}
277
278#[derive(Debug, Error)]
279pub enum StoreError {
280 #[error("io error: {0}")]
281 Io(#[from] std::io::Error),
282 #[error("serialization error: {0}")]
283 Serde(String),
284 #[error("not supported by this store")]
285 NotSupported,
286 #[error("{0}")]
287 Other(String),
288}
289
290pub type StoreResult<T> = Result<T, StoreError>;
291
292pub trait S3Store: Send + Sync {
293 fn load(&self) -> StoreResult<S3State>;
294
295 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
296 fn put_bucket_subresource(
297 &self,
298 bucket: &str,
299 kind: BucketSubresource,
300 payload: &str,
301 ) -> StoreResult<()>;
302 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
303 fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
304
305 fn put_object(
306 &self,
307 bucket: &str,
308 key: &str,
309 version: Option<&str>,
310 body: BodySource,
311 meta: &ObjectMeta,
312 ) -> StoreResult<BodyRef>;
313 fn put_object_meta(
314 &self,
315 bucket: &str,
316 key: &str,
317 version: Option<&str>,
318 meta: &ObjectMeta,
319 ) -> StoreResult<()>;
320 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
321 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
322
323 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
324 fn mpu_put_part(
325 &self,
326 bucket: &str,
327 upload_id: &str,
328 part_number: u32,
329 body: BodySource,
330 etag: &str,
331 ) -> StoreResult<()>;
332 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
333 fn mpu_complete(
334 &self,
335 bucket: &str,
336 upload_id: &str,
337 final_key: &str,
338 version: Option<&str>,
339 meta: &ObjectMeta,
340 ) -> StoreResult<BodyRef>;
341}
342
343pub struct MemoryS3Store;
344
345impl MemoryS3Store {
346 pub fn new() -> Self {
347 Self
348 }
349}
350
351impl Default for MemoryS3Store {
352 fn default() -> Self {
353 Self::new()
354 }
355}
356
357impl S3Store for MemoryS3Store {
358 fn load(&self) -> StoreResult<S3State> {
359 Ok(S3State::default())
360 }
361
362 fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
363 Ok(())
364 }
365 fn put_bucket_subresource(
366 &self,
367 _bucket: &str,
368 _kind: BucketSubresource,
369 _payload: &str,
370 ) -> StoreResult<()> {
371 Ok(())
372 }
373 fn delete_bucket_subresource(
374 &self,
375 _bucket: &str,
376 _kind: BucketSubresource,
377 ) -> StoreResult<()> {
378 Ok(())
379 }
380 fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
381 Ok(())
382 }
383
384 fn put_object(
385 &self,
386 _bucket: &str,
387 _key: &str,
388 _version: Option<&str>,
389 body: BodySource,
390 _meta: &ObjectMeta,
391 ) -> StoreResult<BodyRef> {
392 match body {
393 BodySource::Bytes(b) => Ok(BodyRef::Memory(b)),
394 BodySource::File(_) | BodySource::FileCopy(_) => Err(StoreError::Other(
395 "file-backed put not supported in memory mode".to_string(),
396 )),
397 }
398 }
399 fn put_object_meta(
400 &self,
401 _bucket: &str,
402 _key: &str,
403 _version: Option<&str>,
404 _meta: &ObjectMeta,
405 ) -> StoreResult<()> {
406 Ok(())
407 }
408 fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
409 Ok(())
410 }
411 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
412 match body {
413 BodyRef::Memory(b) => Ok(b.clone()),
414 BodyRef::Disk { .. } => {
415 panic!("MemoryS3Store cannot open Disk-backed BodyRef")
416 }
417 }
418 }
419
420 fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
421 Ok(())
422 }
423 fn mpu_put_part(
424 &self,
425 _bucket: &str,
426 _upload_id: &str,
427 _part_number: u32,
428 _body: BodySource,
429 _etag: &str,
430 ) -> StoreResult<()> {
431 Ok(())
432 }
433 fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
434 Ok(())
435 }
436 fn mpu_complete(
437 &self,
438 _bucket: &str,
439 _upload_id: &str,
440 _final_key: &str,
441 _version: Option<&str>,
442 _meta: &ObjectMeta,
443 ) -> StoreResult<BodyRef> {
444 Ok(BodyRef::Memory(Bytes::new()))
445 }
446}
447
448pub struct DiskS3Store {
449 root: PathBuf,
450 cache: std::sync::Arc<crate::cache::BodyCache>,
451}
452
453impl DiskS3Store {
454 pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
455 Self { root, cache }
456 }
457
458 fn buckets_dir(&self) -> PathBuf {
459 self.root.join("buckets")
460 }
461
462 fn bucket_dir(&self, bucket: &str) -> PathBuf {
463 self.buckets_dir()
464 .join(crate::key_escape::escape_key_segment(bucket))
465 }
466
467 fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
468 self.bucket_dir(bucket)
469 .join("objects")
470 .join(crate::key_escape::escape_key_segment(key))
471 }
472
473 fn version_tag(version: Option<&str>) -> String {
474 version.unwrap_or("null").to_string()
475 }
476
477 fn object_paths(
478 &self,
479 bucket: &str,
480 key: &str,
481 version: Option<&str>,
482 ) -> (PathBuf, PathBuf, PathBuf) {
483 let dir = self.object_dir(bucket, key);
484 let tag = Self::version_tag(version);
485 let bin = dir.join(format!("{}.bin", tag));
486 let toml = dir.join(format!("{}.toml", tag));
487 (dir, bin, toml)
488 }
489
490 fn subresource_filename(kind: BucketSubresource) -> &'static str {
491 match kind {
492 BucketSubresource::Tags => "tags.toml",
493 BucketSubresource::Lifecycle => "lifecycle.toml",
494 BucketSubresource::Cors => "cors.toml",
495 BucketSubresource::Policy => "policy.toml",
496 BucketSubresource::Notification => "notification.toml",
497 BucketSubresource::Logging => "logging.toml",
498 BucketSubresource::Website => "website.toml",
499 BucketSubresource::PublicAccessBlock => "public_access_block.toml",
500 BucketSubresource::ObjectLock => "object_lock.toml",
501 BucketSubresource::Replication => "replication.toml",
502 BucketSubresource::Ownership => "ownership.toml",
503 BucketSubresource::Inventory => "inventory.toml",
504 BucketSubresource::Encryption => "encryption.toml",
505 BucketSubresource::Versioning => "versioning.toml",
506 BucketSubresource::Acl => "acl.toml",
507 BucketSubresource::Accelerate => "accelerate.toml",
508 }
509 }
510
511 fn cleanup_empty(dir: &std::path::Path) {
512 let _ = std::fs::remove_dir(dir);
513 }
514
515 fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
516 self.bucket_dir(bucket)
517 .join("mpu")
518 .join(crate::key_escape::escape_key_segment(upload_id))
519 }
520
521 fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
522 self.mpu_dir(bucket, upload_id).join("parts")
523 }
524
525 fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
526 self.mpu_parts_dir(bucket, upload_id)
527 .join(format!("{}.bin", part_number))
528 }
529
530 fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
531 self.mpu_parts_dir(bucket, upload_id)
532 .join(format!("{}.toml", part_number))
533 }
534
535 fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
536 crate::cache::BodyKey::new(
537 bucket.to_string(),
538 format!("__mpu__/{}", upload_id),
539 Some(format!("part-{}", part_number)),
540 )
541 }
542}
543
544fn io_other(msg: impl Into<String>) -> StoreError {
545 StoreError::Other(msg.into())
546}
547
548impl S3Store for DiskS3Store {
549 fn load(&self) -> StoreResult<S3State> {
550 let mut state = S3State::default();
551 let buckets_dir = self.buckets_dir();
552 if !buckets_dir.exists() {
553 return Ok(state);
554 }
555 for entry in std::fs::read_dir(&buckets_dir)? {
556 let entry = entry?;
557 if !entry.file_type()?.is_dir() {
558 continue;
559 }
560 let bdir = entry.path();
561 let meta_path = bdir.join("meta.toml");
562 if !meta_path.exists() {
563 continue;
564 }
565 let meta_text = std::fs::read_to_string(&meta_path)?;
566 let mut meta: BucketMeta =
567 toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
568 let mut snap = BucketSnapshot {
569 meta: meta.clone(),
570 objects: HashMap::new(),
571 object_versions: HashMap::new(),
572 subresources: HashMap::new(),
573 multipart_uploads: HashMap::new(),
574 };
575
576 for kind in ALL_SUBRESOURCES {
577 let fname = Self::subresource_filename(*kind);
578 let path = bdir.join(fname);
579 if path.exists() {
580 let text = std::fs::read_to_string(&path)?;
581 if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none() {
582 let stripped = text.trim();
583 if !stripped.is_empty() {
584 snap.meta.versioning = Some(stripped.to_string());
585 meta.versioning = snap.meta.versioning.clone();
586 }
587 }
588 snap.subresources.insert(fname.to_string(), text);
589 }
590 }
591
592 let objects_root = bdir.join("objects");
593 if objects_root.exists() {
594 for okey_entry in std::fs::read_dir(&objects_root)? {
595 let okey_entry = okey_entry?;
596 if !okey_entry.file_type()?.is_dir() {
597 continue;
598 }
599 let key_dir = okey_entry.path();
600 let mut versioned: Vec<LoadedObject> = Vec::new();
601 let mut key_name: Option<String> = None;
602 for version_entry in std::fs::read_dir(&key_dir)? {
603 let version_entry = version_entry?;
604 let path = version_entry.path();
605 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
606 continue;
607 };
608 if !fname.ends_with(".toml") {
609 continue;
610 }
611 let version_tag = &fname[..fname.len() - 5];
612 let toml_text = std::fs::read_to_string(&path)?;
613 let obj_meta: ObjectMeta = toml::from_str(&toml_text)
614 .map_err(|e| StoreError::Serde(e.to_string()))?;
615 let bin_path = key_dir.join(format!("{}.bin", version_tag));
616 let (body, size) = if obj_meta.is_delete_marker {
617 (BodyRef::Memory(Bytes::new()), 0u64)
618 } else if bin_path.exists() {
619 let sz = std::fs::metadata(&bin_path)?.len();
620 (
621 BodyRef::Disk {
622 bucket: meta.name.clone(),
623 key: obj_meta.key.clone(),
624 version: if version_tag == "null" {
625 None
626 } else {
627 Some(version_tag.to_string())
628 },
629 path: bin_path,
630 size: sz,
631 },
632 sz,
633 )
634 } else {
635 return Err(StoreError::Other(format!(
640 "missing body file: {}",
641 bin_path.display()
642 )));
643 };
644 let _ = size;
645 key_name.get_or_insert_with(|| obj_meta.key.clone());
646 if version_tag == "null" && obj_meta.version_id.is_none() {
647 snap.objects.insert(
648 obj_meta.key.clone(),
649 LoadedObject {
650 meta: obj_meta,
651 body,
652 },
653 );
654 } else {
655 versioned.push(LoadedObject {
656 meta: obj_meta,
657 body,
658 });
659 }
660 }
661 if !versioned.is_empty() {
662 versioned.sort_by_key(|v| v.meta.last_modified);
663 if let Some(key) = key_name {
664 match versioned.last() {
671 Some(newest) if newest.meta.is_delete_marker => {
672 snap.objects.remove(&key);
673 }
674 Some(newest) => {
675 snap.objects.insert(key.clone(), newest.clone());
676 }
677 None => {}
678 }
679 snap.object_versions.insert(key, versioned);
680 }
681 }
682 }
683 }
684
685 let mpu_root = bdir.join("mpu");
686 if mpu_root.exists() {
687 for upload_entry in std::fs::read_dir(&mpu_root)? {
688 let upload_entry = upload_entry?;
689 if !upload_entry.file_type()?.is_dir() {
690 continue;
691 }
692 let upload_dir = upload_entry.path();
693 let init_path = upload_dir.join("init.toml");
694 if !init_path.exists() {
695 continue;
696 }
697 let init_text = std::fs::read_to_string(&init_path)?;
698 let init: MpuInit =
699 toml::from_str(&init_text).map_err(|e| StoreError::Serde(e.to_string()))?;
700 let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
701 let parts_dir = upload_dir.join("parts");
702 if parts_dir.exists() {
703 for part_entry in std::fs::read_dir(&parts_dir)? {
704 let part_entry = part_entry?;
705 let path = part_entry.path();
706 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
707 continue;
708 };
709 if !fname.ends_with(".toml") {
710 continue;
711 }
712 let stem = &fname[..fname.len() - 5];
713 let Ok(part_number) = stem.parse::<u32>() else {
714 continue;
715 };
716 let toml_text = std::fs::read_to_string(&path)?;
717 let part_meta: UploadPartMeta = toml::from_str(&toml_text)
718 .map_err(|e| StoreError::Serde(e.to_string()))?;
719 let bin_path = parts_dir.join(format!("{}.bin", part_number));
720 if !bin_path.exists() {
721 return Err(StoreError::Other(format!(
722 "missing multipart part body file: {}",
723 bin_path.display()
724 )));
725 }
726 let sz = std::fs::metadata(&bin_path)?.len();
727 let body = BodyRef::Disk {
728 bucket: meta.name.clone(),
729 key: format!("__mpu__/{}", init.upload_id),
730 version: Some(format!("part-{}", part_number)),
731 path: bin_path,
732 size: sz,
733 };
734 loaded_parts.insert(
735 part_number,
736 LoadedPart {
737 meta: part_meta,
738 body,
739 },
740 );
741 }
742 }
743 snap.multipart_uploads.insert(
744 init.upload_id.clone(),
745 LoadedMpu {
746 init,
747 parts: loaded_parts,
748 },
749 );
750 }
751 }
752
753 state.buckets.insert(meta.name.clone(), snap);
754 }
755 Ok(state)
756 }
757
758 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
759 let dir = self.bucket_dir(bucket);
760 std::fs::create_dir_all(&dir)?;
761 crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
762 Ok(())
763 }
764
765 fn put_bucket_subresource(
766 &self,
767 bucket: &str,
768 kind: BucketSubresource,
769 payload: &str,
770 ) -> StoreResult<()> {
771 let dir = self.bucket_dir(bucket);
772 std::fs::create_dir_all(&dir)?;
773 let path = dir.join(Self::subresource_filename(kind));
774 crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
775 Ok(())
776 }
777
778 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
779 let path = self
780 .bucket_dir(bucket)
781 .join(Self::subresource_filename(kind));
782 match std::fs::remove_file(&path) {
783 Ok(_) => Ok(()),
784 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
785 Err(e) => Err(e.into()),
786 }
787 }
788
789 fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
790 let dir = self.bucket_dir(bucket);
791 match std::fs::remove_dir_all(&dir) {
792 Ok(_) => Ok(()),
793 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
794 Err(e) => Err(e.into()),
795 }
796 }
797
798 fn put_object(
799 &self,
800 bucket: &str,
801 key: &str,
802 version: Option<&str>,
803 body: BodySource,
804 meta: &ObjectMeta,
805 ) -> StoreResult<BodyRef> {
806 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
807 std::fs::create_dir_all(&dir)?;
808
809 if meta.is_delete_marker {
810 crate::atomic::write_atomic_toml(&toml_path, meta)?;
811 return Ok(BodyRef::Memory(Bytes::new()));
812 }
813
814 let size: u64;
815 let bytes_for_cache: Option<Bytes>;
816 match body {
817 BodySource::Bytes(b) => {
818 size = b.len() as u64;
819 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
820 bytes_for_cache = Some(b);
821 }
822 BodySource::File(src) => {
823 let src_size = std::fs::metadata(&src)?.len();
824 size = src_size;
825 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
826 bytes_for_cache = None;
827 }
828 BodySource::FileCopy(src) => {
829 let src_size = std::fs::metadata(&src)?.len();
830 size = src_size;
831 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
832 bytes_for_cache = None;
833 }
834 }
835
836 crate::atomic::write_atomic_toml(&toml_path, meta)?;
837
838 let body_key = crate::cache::BodyKey::new(
839 bucket.to_string(),
840 key.to_string(),
841 version.map(|s| s.to_string()),
842 );
843 if let Some(b) = bytes_for_cache {
844 self.cache.insert(body_key, b);
845 } else {
846 self.cache.invalidate(&crate::cache::BodyKey::new(
847 bucket.to_string(),
848 key.to_string(),
849 version.map(|s| s.to_string()),
850 ));
851 }
852
853 Ok(BodyRef::Disk {
854 bucket: bucket.to_string(),
855 key: key.to_string(),
856 version: version.map(|s| s.to_string()),
857 path: bin_path,
858 size,
859 })
860 }
861
862 fn put_object_meta(
863 &self,
864 bucket: &str,
865 key: &str,
866 version: Option<&str>,
867 meta: &ObjectMeta,
868 ) -> StoreResult<()> {
869 let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
870 std::fs::create_dir_all(&dir)?;
871 crate::atomic::write_atomic_toml(&toml_path, meta)?;
872 Ok(())
873 }
874
875 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
876 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
877 for p in [&bin_path, &toml_path] {
878 match std::fs::remove_file(p) {
879 Ok(_) => {}
880 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
881 Err(e) => return Err(e.into()),
882 }
883 }
884 Self::cleanup_empty(&dir);
885
886 self.cache.invalidate(&crate::cache::BodyKey::new(
887 bucket.to_string(),
888 key.to_string(),
889 version.map(|s| s.to_string()),
890 ));
891 Ok(())
892 }
893
894 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
895 match body {
896 BodyRef::Memory(b) => Ok(b.clone()),
897 BodyRef::Disk {
898 bucket,
899 key,
900 version,
901 path,
902 size: _,
903 } => {
904 let body_key =
905 crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
906 if let Some(bytes) = self.cache.get(&body_key) {
907 return Ok(bytes);
908 }
909 let bytes = Bytes::from(std::fs::read(path)?);
910 self.cache.insert(body_key, bytes.clone());
911 Ok(bytes)
912 }
913 }
914 }
915
916 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
917 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
918 std::fs::create_dir_all(&parts_dir)?;
919 let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
920 crate::atomic::write_atomic_toml(&init_path, init)?;
921 Ok(())
922 }
923
924 fn mpu_put_part(
925 &self,
926 bucket: &str,
927 upload_id: &str,
928 part_number: u32,
929 body: BodySource,
930 etag: &str,
931 ) -> StoreResult<()> {
932 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
933 std::fs::create_dir_all(&parts_dir)?;
934 let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
935 let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
936
937 let size: u64 = match body {
938 BodySource::Bytes(b) => {
939 let n = b.len() as u64;
940 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
941 let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
942 self.cache.insert(cache_key, b);
943 n
944 }
945 BodySource::File(src) => {
946 let n = std::fs::metadata(&src)?.len();
947 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
948 self.cache
949 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
950 n
951 }
952 BodySource::FileCopy(src) => {
953 let n = std::fs::metadata(&src)?.len();
954 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
955 self.cache
956 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
957 n
958 }
959 };
960
961 let meta = UploadPartMeta {
962 part_number,
963 etag: etag.to_string(),
964 size,
965 last_modified: Utc::now(),
966 };
967 crate::atomic::write_atomic_toml(&toml_path, &meta)?;
968 Ok(())
969 }
970
971 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
972 let dir = self.mpu_dir(bucket, upload_id);
973 if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
975 for entry in entries.flatten() {
976 let path = entry.path();
977 if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
978 if let Some(stem) = fname.strip_suffix(".bin") {
979 if let Ok(n) = stem.parse::<u32>() {
980 self.cache
981 .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
982 }
983 }
984 }
985 }
986 }
987 match std::fs::remove_dir_all(&dir) {
988 Ok(_) => Ok(()),
989 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
990 Err(e) => Err(e.into()),
991 }
992 }
993
994 fn mpu_complete(
995 &self,
996 bucket: &str,
997 upload_id: &str,
998 final_key: &str,
999 version: Option<&str>,
1000 meta: &ObjectMeta,
1001 ) -> StoreResult<BodyRef> {
1002 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1003 if !parts_dir.exists() {
1004 return Err(io_other(format!(
1005 "mpu_complete: no parts dir for upload {}",
1006 upload_id
1007 )));
1008 }
1009
1010 let mut part_numbers: Vec<u32> = Vec::new();
1012 for entry in std::fs::read_dir(&parts_dir)? {
1013 let entry = entry?;
1014 let path = entry.path();
1015 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1016 continue;
1017 };
1018 if let Some(stem) = fname.strip_suffix(".toml") {
1019 if let Ok(n) = stem.parse::<u32>() {
1020 if self.mpu_part_bin(bucket, upload_id, n).exists() {
1021 part_numbers.push(n);
1022 }
1023 }
1024 }
1025 }
1026 part_numbers.sort_unstable();
1027 if part_numbers.is_empty() {
1028 return Err(io_other(format!(
1029 "mpu_complete: upload {} has no parts",
1030 upload_id
1031 )));
1032 }
1033
1034 let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1035 std::fs::create_dir_all(&dir)?;
1036
1037 let total_size: u64 = if part_numbers.len() == 1 {
1038 let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1039 let sz = std::fs::metadata(&only)?.len();
1040 match std::fs::rename(&only, &bin_path) {
1041 Ok(_) => {}
1042 Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1043 {
1045 let mut input = std::fs::File::open(&only)?;
1046 let tmp = {
1047 let mut os = bin_path.as_os_str().to_owned();
1048 os.push(".tmp");
1049 PathBuf::from(os)
1050 };
1051 let mut out = std::fs::OpenOptions::new()
1052 .write(true)
1053 .create(true)
1054 .truncate(true)
1055 .open(&tmp)?;
1056 std::io::copy(&mut input, &mut out)?;
1057 out.sync_all()?;
1058 std::fs::rename(&tmp, &bin_path)?;
1059 }
1060 let _ = std::fs::remove_file(&only);
1061 }
1062 Err(e) => return Err(e.into()),
1063 }
1064 sz
1065 } else {
1066 let tmp = {
1067 let mut os = bin_path.as_os_str().to_owned();
1068 os.push(".tmp");
1069 PathBuf::from(os)
1070 };
1071 let mut total: u64 = 0;
1072 {
1073 let mut out = std::fs::OpenOptions::new()
1074 .write(true)
1075 .create(true)
1076 .truncate(true)
1077 .open(&tmp)?;
1078 for n in &part_numbers {
1079 let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1080 let mut input = std::fs::File::open(&part_path)?;
1081 let copied = std::io::copy(&mut input, &mut out)?;
1082 total += copied;
1083 }
1084 out.sync_all()?;
1085 }
1086 std::fs::rename(&tmp, &bin_path)?;
1087 if let Some(parent) = bin_path.parent() {
1088 if let Ok(dir_handle) = std::fs::File::open(parent) {
1089 let _ = dir_handle.sync_all();
1090 }
1091 }
1092 total
1093 };
1094
1095 crate::atomic::write_atomic_toml(&toml_path, meta)?;
1096
1097 for n in &part_numbers {
1099 self.cache
1100 .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1101 }
1102 let mpu_dir = self.mpu_dir(bucket, upload_id);
1103 let _ = std::fs::remove_dir_all(&mpu_dir);
1104
1105 self.cache.invalidate(&crate::cache::BodyKey::new(
1110 bucket.to_string(),
1111 final_key.to_string(),
1112 version.map(|s| s.to_string()),
1113 ));
1114
1115 Ok(BodyRef::Disk {
1116 bucket: bucket.to_string(),
1117 key: final_key.to_string(),
1118 version: version.map(|s| s.to_string()),
1119 path: bin_path,
1120 size: total_size,
1121 })
1122 }
1123}
1124
1125fn libc_exdev() -> i32 {
1126 18
1127}
1128
1129#[cfg(test)]
1130mod disk_tests {
1131 use super::*;
1132 use std::sync::Arc;
1133 use tempfile::TempDir;
1134
1135 fn new_store(tmp: &TempDir) -> DiskS3Store {
1136 let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1137 DiskS3Store::new(tmp.path().to_path_buf(), cache)
1138 }
1139
1140 fn new_store_with_cache(
1141 tmp: &TempDir,
1142 cap: u64,
1143 ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1144 let cache = Arc::new(crate::cache::BodyCache::new(cap));
1145 (
1146 DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1147 cache,
1148 )
1149 }
1150
1151 fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1152 ObjectMeta {
1153 key: key.to_string(),
1154 content_type: "application/octet-stream".to_string(),
1155 etag: "etag".to_string(),
1156 size,
1157 ..Default::default()
1158 }
1159 }
1160
1161 #[test]
1162 fn put_bucket_meta_roundtrip() {
1163 let tmp = TempDir::new().unwrap();
1164 let store = new_store(&tmp);
1165 let meta = BucketMeta {
1166 name: "b1".to_string(),
1167 region: "us-east-1".to_string(),
1168 versioning: Some("Enabled".to_string()),
1169 ..Default::default()
1170 };
1171 store.put_bucket_meta("b1", &meta).unwrap();
1172 let loaded = store.load().unwrap();
1173 let snap = loaded.buckets.get("b1").unwrap();
1174 assert_eq!(snap.meta.name, "b1");
1175 assert_eq!(snap.meta.region, "us-east-1");
1176 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1177 }
1178
1179 #[test]
1180 fn put_bucket_subresource_each_variant_writes_file() {
1181 let tmp = TempDir::new().unwrap();
1182 let store = new_store(&tmp);
1183 store
1184 .put_bucket_meta(
1185 "b",
1186 &BucketMeta {
1187 name: "b".to_string(),
1188 ..Default::default()
1189 },
1190 )
1191 .unwrap();
1192 let variants = [
1193 BucketSubresource::Tags,
1194 BucketSubresource::Lifecycle,
1195 BucketSubresource::Cors,
1196 BucketSubresource::Policy,
1197 BucketSubresource::Notification,
1198 BucketSubresource::Logging,
1199 BucketSubresource::Website,
1200 BucketSubresource::PublicAccessBlock,
1201 BucketSubresource::ObjectLock,
1202 BucketSubresource::Replication,
1203 BucketSubresource::Ownership,
1204 BucketSubresource::Inventory,
1205 BucketSubresource::Encryption,
1206 BucketSubresource::Versioning,
1207 BucketSubresource::Acl,
1208 BucketSubresource::Accelerate,
1209 ];
1210 for v in variants {
1211 store
1212 .put_bucket_subresource("b", v, "payload=true")
1213 .unwrap();
1214 let file = store
1215 .bucket_dir("b")
1216 .join(DiskS3Store::subresource_filename(v));
1217 assert!(file.exists(), "{:?}", v);
1218 assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1219 }
1220 }
1221
1222 #[test]
1223 fn delete_bucket_subresource_removes_file() {
1224 let tmp = TempDir::new().unwrap();
1225 let store = new_store(&tmp);
1226 store
1227 .put_bucket_meta(
1228 "b",
1229 &BucketMeta {
1230 name: "b".to_string(),
1231 ..Default::default()
1232 },
1233 )
1234 .unwrap();
1235 store
1236 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1237 .unwrap();
1238 store
1239 .delete_bucket_subresource("b", BucketSubresource::Tags)
1240 .unwrap();
1241 let file = store.bucket_dir("b").join("tags.toml");
1242 assert!(!file.exists());
1243 store
1245 .delete_bucket_subresource("b", BucketSubresource::Tags)
1246 .unwrap();
1247 }
1248
1249 #[test]
1250 fn put_object_bytes_roundtrip() {
1251 let tmp = TempDir::new().unwrap();
1252 let store = new_store(&tmp);
1253 store
1254 .put_bucket_meta(
1255 "b",
1256 &BucketMeta {
1257 name: "b".to_string(),
1258 ..Default::default()
1259 },
1260 )
1261 .unwrap();
1262 let data = Bytes::from_static(b"hello world");
1263 let meta = sample_meta("k1", data.len() as u64);
1264 let body_ref = store
1265 .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1266 .unwrap();
1267 match &body_ref {
1268 BodyRef::Disk {
1269 bucket,
1270 key,
1271 size,
1272 path,
1273 ..
1274 } => {
1275 assert_eq!(bucket, "b");
1276 assert_eq!(key, "k1");
1277 assert_eq!(*size, data.len() as u64);
1278 assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1279 }
1280 _ => panic!("expected Disk"),
1281 }
1282 let loaded = store.load().unwrap();
1283 let snap = loaded.buckets.get("b").unwrap();
1284 let obj = snap.objects.get("k1").unwrap();
1285 assert_eq!(obj.meta.size, data.len() as u64);
1286 }
1287
1288 #[test]
1289 fn put_object_file_copy_source_leaves_src_in_place() {
1290 let tmp = TempDir::new().unwrap();
1291 let store = new_store(&tmp);
1292 store
1293 .put_bucket_meta(
1294 "b",
1295 &BucketMeta {
1296 name: "b".to_string(),
1297 ..Default::default()
1298 },
1299 )
1300 .unwrap();
1301 let src_dir = TempDir::new().unwrap();
1302 let src = src_dir.path().join("src.bin");
1303 std::fs::write(&src, b"copied-body").unwrap();
1304 let meta = sample_meta("k", 11);
1305 let bref = store
1306 .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1307 .unwrap();
1308 match bref {
1309 BodyRef::Disk { path, size, .. } => {
1310 assert_eq!(size, 11);
1311 assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1312 }
1313 _ => panic!("expected Disk bodyref"),
1314 }
1315 assert!(src.exists(), "source file must not be moved by FileCopy");
1316 assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1317 }
1318
1319 #[test]
1320 fn put_object_file_source() {
1321 let tmp = TempDir::new().unwrap();
1322 let store = new_store(&tmp);
1323 store
1324 .put_bucket_meta(
1325 "b",
1326 &BucketMeta {
1327 name: "b".to_string(),
1328 ..Default::default()
1329 },
1330 )
1331 .unwrap();
1332 let src = tmp.path().join("src.bin");
1333 std::fs::write(&src, b"file-body").unwrap();
1334 let meta = sample_meta("k", 9);
1335 let body_ref = store
1336 .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1337 .unwrap();
1338 let path = match body_ref {
1339 BodyRef::Disk { path, .. } => path,
1340 _ => panic!(),
1341 };
1342 assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1343 }
1344
1345 #[test]
1346 fn put_object_meta_only_keeps_bin() {
1347 let tmp = TempDir::new().unwrap();
1348 let store = new_store(&tmp);
1349 store
1350 .put_bucket_meta(
1351 "b",
1352 &BucketMeta {
1353 name: "b".to_string(),
1354 ..Default::default()
1355 },
1356 )
1357 .unwrap();
1358 let data = Bytes::from_static(b"abc");
1359 let mut meta = sample_meta("k", 3);
1360 store
1361 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1362 .unwrap();
1363 let (_, bin, _) = store.object_paths("b", "k", None);
1364 let before = std::fs::read(&bin).unwrap();
1365 meta.tags.insert("x".to_string(), "y".to_string());
1366 store.put_object_meta("b", "k", None, &meta).unwrap();
1367 assert_eq!(std::fs::read(&bin).unwrap(), before);
1368 let loaded = store.load().unwrap();
1369 let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1370 assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1371 }
1372
1373 #[test]
1374 fn delete_object_cleans_up_files_and_cache() {
1375 let tmp = TempDir::new().unwrap();
1376 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1377 store
1378 .put_bucket_meta(
1379 "b",
1380 &BucketMeta {
1381 name: "b".to_string(),
1382 ..Default::default()
1383 },
1384 )
1385 .unwrap();
1386 let data = Bytes::from_static(b"bye");
1387 let meta = sample_meta("k", 3);
1388 store
1389 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1390 .unwrap();
1391 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1392 assert!(cache.get(&body_key).is_some());
1393 store.delete_object("b", "k", None).unwrap();
1394 let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1395 assert!(!bin.exists());
1396 assert!(!toml_path.exists());
1397 assert!(!dir.exists());
1398 assert!(cache.get(&body_key).is_none());
1399 }
1400
1401 #[test]
1402 fn open_object_body_cache_hit_and_refill() {
1403 let tmp = TempDir::new().unwrap();
1404 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1405 store
1406 .put_bucket_meta(
1407 "b",
1408 &BucketMeta {
1409 name: "b".to_string(),
1410 ..Default::default()
1411 },
1412 )
1413 .unwrap();
1414 let data = Bytes::from_static(b"payload");
1415 let meta = sample_meta("k", data.len() as u64);
1416 let body_ref = store
1417 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1418 .unwrap();
1419 let got = store.open_object_body(&body_ref).unwrap();
1421 assert_eq!(got, data);
1422 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1424 cache.invalidate(&body_key);
1425 assert!(cache.get(&body_key).is_none());
1426 let got = store.open_object_body(&body_ref).unwrap();
1427 assert_eq!(got, data);
1428 assert!(cache.get(&body_key).is_some());
1429 }
1430
1431 #[test]
1432 fn open_object_body_large_bypasses_cache() {
1433 let tmp = TempDir::new().unwrap();
1434 let (store, cache) = new_store_with_cache(&tmp, 1024);
1436 store
1437 .put_bucket_meta(
1438 "b",
1439 &BucketMeta {
1440 name: "b".to_string(),
1441 ..Default::default()
1442 },
1443 )
1444 .unwrap();
1445 let data = Bytes::from(vec![7u8; 800]);
1446 let meta = sample_meta("big", 800);
1447 let body_ref = store
1448 .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1449 .unwrap();
1450 let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1451 assert!(cache.get(&body_key).is_none());
1452 let got = store.open_object_body(&body_ref).unwrap();
1453 assert_eq!(got, data);
1454 assert!(cache.get(&body_key).is_none());
1456 }
1457
1458 #[test]
1459 fn load_empty_dir() {
1460 let tmp = TempDir::new().unwrap();
1461 let store = new_store(&tmp);
1462 let s = store.load().unwrap();
1463 assert!(s.buckets.is_empty());
1464 }
1465
1466 #[test]
1467 fn load_skips_mpu_without_init() {
1468 let tmp = TempDir::new().unwrap();
1469 let store = new_store(&tmp);
1470 store
1471 .put_bucket_meta(
1472 "b",
1473 &BucketMeta {
1474 name: "b".to_string(),
1475 ..Default::default()
1476 },
1477 )
1478 .unwrap();
1479 let data = Bytes::from_static(b"x");
1480 let meta = sample_meta("k", 1);
1481 store
1482 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1483 .unwrap();
1484 let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1486 std::fs::create_dir_all(&mpu).unwrap();
1487
1488 let loaded = store.load().unwrap();
1489 let snap = loaded.buckets.get("b").unwrap();
1490 assert_eq!(snap.objects.len(), 1);
1491 assert!(snap.multipart_uploads.is_empty());
1492 }
1493
1494 #[test]
1495 fn load_reads_bucket_subresources() {
1496 let tmp = TempDir::new().unwrap();
1497 let store = new_store(&tmp);
1498 store
1499 .put_bucket_meta(
1500 "b",
1501 &BucketMeta {
1502 name: "b".to_string(),
1503 ..Default::default()
1504 },
1505 )
1506 .unwrap();
1507 store
1508 .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1509 .unwrap();
1510 store
1511 .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1512 .unwrap();
1513 store
1514 .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1515 .unwrap();
1516 store
1517 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1518 .unwrap();
1519 let loaded = store.load().unwrap();
1520 let snap = loaded.buckets.get("b").unwrap();
1521 assert_eq!(
1522 snap.subresources.get("lifecycle.toml").map(String::as_str),
1523 Some("<Lifecycle/>"),
1524 );
1525 assert_eq!(
1526 snap.subresources.get("cors.toml").map(String::as_str),
1527 Some("<Cors/>"),
1528 );
1529 assert_eq!(
1530 snap.subresources.get("policy.toml").map(String::as_str),
1531 Some("{}"),
1532 );
1533 assert_eq!(
1534 snap.subresources.get("tags.toml").map(String::as_str),
1535 Some("x=1"),
1536 );
1537 }
1538
1539 #[test]
1540 fn versioned_put_load_roundtrip() {
1541 let tmp = TempDir::new().unwrap();
1542 let store = new_store(&tmp);
1543 store
1544 .put_bucket_meta(
1545 "b",
1546 &BucketMeta {
1547 name: "b".to_string(),
1548 versioning: Some("Enabled".to_string()),
1549 ..Default::default()
1550 },
1551 )
1552 .unwrap();
1553 let base = chrono::Utc::now();
1554 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1555 .iter()
1556 .enumerate()
1557 {
1558 let mut m = sample_meta("k", body.len() as u64);
1559 m.version_id = Some((*vid).to_string());
1560 m.last_modified = base + chrono::Duration::seconds(i as i64);
1561 store
1562 .put_object(
1563 "b",
1564 "k",
1565 Some(*vid),
1566 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1567 &m,
1568 )
1569 .unwrap();
1570 }
1571 let loaded = store.load().unwrap();
1572 let snap = loaded.buckets.get("b").unwrap();
1573 let versions = snap.object_versions.get("k").expect("versions present");
1574 assert_eq!(versions.len(), 3);
1575 assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1576 assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1577 assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1578 for v in versions {
1579 match &v.body {
1580 BodyRef::Disk { size, .. } => assert!(*size > 0),
1581 _ => panic!("expected Disk body"),
1582 }
1583 }
1584 }
1585
1586 #[test]
1587 fn versioned_load_promotes_latest_live_to_snap_objects() {
1588 let tmp = TempDir::new().unwrap();
1589 let store = new_store(&tmp);
1590 store
1591 .put_bucket_meta(
1592 "b",
1593 &BucketMeta {
1594 name: "b".to_string(),
1595 versioning: Some("Enabled".to_string()),
1596 ..Default::default()
1597 },
1598 )
1599 .unwrap();
1600 let base = chrono::Utc::now();
1601 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1602 .iter()
1603 .enumerate()
1604 {
1605 let mut m = sample_meta("k", body.len() as u64);
1606 m.version_id = Some((*vid).to_string());
1607 m.last_modified = base + chrono::Duration::seconds(i as i64);
1608 store
1609 .put_object(
1610 "b",
1611 "k",
1612 Some(*vid),
1613 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1614 &m,
1615 )
1616 .unwrap();
1617 }
1618 let loaded = store.load().unwrap();
1619 let snap = loaded.buckets.get("b").unwrap();
1620 let current = snap.objects.get("k").expect("current object promoted");
1621 assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1622 }
1623
1624 #[test]
1625 fn versioned_load_trailing_delete_marker_hides_current() {
1626 let tmp = TempDir::new().unwrap();
1627 let store = new_store(&tmp);
1628 store
1629 .put_bucket_meta(
1630 "b",
1631 &BucketMeta {
1632 name: "b".to_string(),
1633 versioning: Some("Enabled".to_string()),
1634 ..Default::default()
1635 },
1636 )
1637 .unwrap();
1638 let base = chrono::Utc::now();
1639 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1640 .iter()
1641 .enumerate()
1642 {
1643 let mut m = sample_meta("k", body.len() as u64);
1644 m.version_id = Some((*vid).to_string());
1645 m.last_modified = base + chrono::Duration::seconds(i as i64);
1646 store
1647 .put_object(
1648 "b",
1649 "k",
1650 Some(*vid),
1651 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1652 &m,
1653 )
1654 .unwrap();
1655 }
1656 let mut dm = sample_meta("k", 0);
1658 dm.version_id = Some("dm1".to_string());
1659 dm.is_delete_marker = true;
1660 dm.last_modified = base + chrono::Duration::seconds(10);
1661 store
1662 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1663 .unwrap();
1664 let loaded = store.load().unwrap();
1665 let snap = loaded.buckets.get("b").unwrap();
1666 assert!(
1667 !snap.objects.contains_key("k"),
1668 "trailing delete marker must hide current object",
1669 );
1670 assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1671 }
1672
1673 #[test]
1674 fn legacy_null_object_overridden_by_newer_versions() {
1675 let tmp = TempDir::new().unwrap();
1678 let store = new_store(&tmp);
1679 store
1680 .put_bucket_meta(
1681 "b",
1682 &BucketMeta {
1683 name: "b".to_string(),
1684 ..Default::default()
1685 },
1686 )
1687 .unwrap();
1688 let base = chrono::Utc::now();
1689 let mut null_meta = sample_meta("k", 3);
1690 null_meta.last_modified = base;
1691 store
1692 .put_object(
1693 "b",
1694 "k",
1695 None,
1696 BodySource::Bytes(Bytes::from_static(b"old")),
1697 &null_meta,
1698 )
1699 .unwrap();
1700 store
1702 .put_bucket_meta(
1703 "b",
1704 &BucketMeta {
1705 name: "b".to_string(),
1706 versioning: Some("Enabled".to_string()),
1707 ..Default::default()
1708 },
1709 )
1710 .unwrap();
1711 let mut v1 = sample_meta("k", 3);
1712 v1.version_id = Some("v1".to_string());
1713 v1.last_modified = base + chrono::Duration::seconds(1);
1714 store
1715 .put_object(
1716 "b",
1717 "k",
1718 Some("v1"),
1719 BodySource::Bytes(Bytes::from_static(b"new")),
1720 &v1,
1721 )
1722 .unwrap();
1723 let mut v2 = sample_meta("k", 5);
1724 v2.version_id = Some("v2".to_string());
1725 v2.last_modified = base + chrono::Duration::seconds(2);
1726 store
1727 .put_object(
1728 "b",
1729 "k",
1730 Some("v2"),
1731 BodySource::Bytes(Bytes::from_static(b"newer")),
1732 &v2,
1733 )
1734 .unwrap();
1735 let loaded = store.load().unwrap();
1736 let snap = loaded.buckets.get("b").unwrap();
1737 let current = snap
1738 .objects
1739 .get("k")
1740 .expect("latest live version must override stale null");
1741 assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1742 assert_eq!(current.meta.size, 5);
1743 }
1744
1745 #[test]
1746 fn legacy_null_hidden_by_trailing_delete_marker() {
1747 let tmp = TempDir::new().unwrap();
1748 let store = new_store(&tmp);
1749 store
1750 .put_bucket_meta(
1751 "b",
1752 &BucketMeta {
1753 name: "b".to_string(),
1754 ..Default::default()
1755 },
1756 )
1757 .unwrap();
1758 let base = chrono::Utc::now();
1759 let mut null_meta = sample_meta("k", 3);
1760 null_meta.last_modified = base;
1761 store
1762 .put_object(
1763 "b",
1764 "k",
1765 None,
1766 BodySource::Bytes(Bytes::from_static(b"old")),
1767 &null_meta,
1768 )
1769 .unwrap();
1770 store
1771 .put_bucket_meta(
1772 "b",
1773 &BucketMeta {
1774 name: "b".to_string(),
1775 versioning: Some("Enabled".to_string()),
1776 ..Default::default()
1777 },
1778 )
1779 .unwrap();
1780 let mut v1 = sample_meta("k", 3);
1781 v1.version_id = Some("v1".to_string());
1782 v1.last_modified = base + chrono::Duration::seconds(1);
1783 store
1784 .put_object(
1785 "b",
1786 "k",
1787 Some("v1"),
1788 BodySource::Bytes(Bytes::from_static(b"new")),
1789 &v1,
1790 )
1791 .unwrap();
1792 let mut dm = sample_meta("k", 0);
1793 dm.version_id = Some("dm1".to_string());
1794 dm.is_delete_marker = true;
1795 dm.last_modified = base + chrono::Duration::seconds(2);
1796 store
1797 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1798 .unwrap();
1799 let loaded = store.load().unwrap();
1800 let snap = loaded.buckets.get("b").unwrap();
1801 assert!(
1802 !snap.objects.contains_key("k"),
1803 "trailing delete marker must hide even a legacy null object",
1804 );
1805 }
1806
1807 #[test]
1808 fn delete_marker_roundtrip_no_body_file() {
1809 let tmp = TempDir::new().unwrap();
1810 let store = new_store(&tmp);
1811 store
1812 .put_bucket_meta(
1813 "b",
1814 &BucketMeta {
1815 name: "b".to_string(),
1816 versioning: Some("Enabled".to_string()),
1817 ..Default::default()
1818 },
1819 )
1820 .unwrap();
1821 let mut m = sample_meta("k", 0);
1822 m.version_id = Some("dm1".to_string());
1823 m.is_delete_marker = true;
1824 store
1825 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1826 .unwrap();
1827 let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1829 assert!(!bin.exists(), "delete marker must not have a .bin file");
1830 assert!(toml_path.exists());
1831
1832 let loaded = store.load().unwrap();
1833 let versions = loaded
1834 .buckets
1835 .get("b")
1836 .unwrap()
1837 .object_versions
1838 .get("k")
1839 .unwrap();
1840 assert_eq!(versions.len(), 1);
1841 assert!(versions[0].meta.is_delete_marker);
1842 match &versions[0].body {
1843 BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1844 _ => panic!("delete marker body should be empty Memory"),
1845 }
1846 }
1847
1848 #[test]
1849 fn mixed_nonversioned_and_versioned_buckets() {
1850 let tmp = TempDir::new().unwrap();
1851 let store = new_store(&tmp);
1852 store
1853 .put_bucket_meta(
1854 "a",
1855 &BucketMeta {
1856 name: "a".to_string(),
1857 ..Default::default()
1858 },
1859 )
1860 .unwrap();
1861 store
1862 .put_bucket_meta(
1863 "b",
1864 &BucketMeta {
1865 name: "b".to_string(),
1866 versioning: Some("Enabled".to_string()),
1867 ..Default::default()
1868 },
1869 )
1870 .unwrap();
1871 let ma = sample_meta("only", 3);
1872 store
1873 .put_object(
1874 "a",
1875 "only",
1876 None,
1877 BodySource::Bytes(Bytes::from_static(b"aaa")),
1878 &ma,
1879 )
1880 .unwrap();
1881 let base = chrono::Utc::now();
1882 for (i, vid) in ["v1", "v2"].iter().enumerate() {
1883 let mut m = sample_meta("twice", 2);
1884 m.version_id = Some((*vid).to_string());
1885 m.last_modified = base + chrono::Duration::seconds(i as i64);
1886 store
1887 .put_object(
1888 "b",
1889 "twice",
1890 Some(*vid),
1891 BodySource::Bytes(Bytes::from_static(b"xx")),
1892 &m,
1893 )
1894 .unwrap();
1895 }
1896 let loaded = store.load().unwrap();
1897 assert_eq!(loaded.buckets.len(), 2);
1898 let a = loaded.buckets.get("a").unwrap();
1899 assert_eq!(a.objects.len(), 1);
1900 assert!(a.object_versions.is_empty());
1901 let b = loaded.buckets.get("b").unwrap();
1902 assert_eq!(
1905 b.objects.get("twice").unwrap().meta.version_id.as_deref(),
1906 Some("v2")
1907 );
1908 assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
1909 }
1910
1911 fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
1912 MpuInit {
1913 upload_id: upload_id.to_string(),
1914 key: key.to_string(),
1915 initiated: chrono::Utc::now(),
1916 content_type: "application/octet-stream".to_string(),
1917 storage_class: "STANDARD".to_string(),
1918 ..Default::default()
1919 }
1920 }
1921
1922 #[test]
1923 fn mpu_create_then_load_empty_parts() {
1924 let tmp = TempDir::new().unwrap();
1925 let store = new_store(&tmp);
1926 store
1927 .put_bucket_meta(
1928 "b",
1929 &BucketMeta {
1930 name: "b".to_string(),
1931 ..Default::default()
1932 },
1933 )
1934 .unwrap();
1935 let init = sample_mpu_init("up1", "k1");
1936 store.mpu_create("b", "up1", &init).unwrap();
1937 let loaded = store.load().unwrap();
1938 let snap = loaded.buckets.get("b").unwrap();
1939 let m = snap.multipart_uploads.get("up1").expect("upload present");
1940 assert_eq!(m.init.upload_id, "up1");
1941 assert_eq!(m.init.key, "k1");
1942 assert!(m.parts.is_empty());
1943 }
1944
1945 #[test]
1946 fn mpu_put_part_then_load_three_parts() {
1947 let tmp = TempDir::new().unwrap();
1948 let store = new_store(&tmp);
1949 store
1950 .put_bucket_meta(
1951 "b",
1952 &BucketMeta {
1953 name: "b".to_string(),
1954 ..Default::default()
1955 },
1956 )
1957 .unwrap();
1958 store
1959 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
1960 .unwrap();
1961 let bodies = [
1962 Bytes::from_static(b"part-one"),
1963 Bytes::from_static(b"part-two-longer"),
1964 Bytes::from_static(b"p3"),
1965 ];
1966 for (i, body) in bodies.iter().enumerate() {
1967 let n = (i + 1) as u32;
1968 store
1969 .mpu_put_part(
1970 "b",
1971 "up",
1972 n,
1973 BodySource::Bytes(body.clone()),
1974 &format!("et{}", n),
1975 )
1976 .unwrap();
1977 }
1978 let loaded = store.load().unwrap();
1979 let snap = loaded.buckets.get("b").unwrap();
1980 let m = snap.multipart_uploads.get("up").unwrap();
1981 assert_eq!(m.parts.len(), 3);
1982 for (i, body) in bodies.iter().enumerate() {
1983 let n = (i + 1) as u32;
1984 let part = m.parts.get(&n).unwrap();
1985 assert_eq!(part.meta.part_number, n);
1986 assert_eq!(part.meta.size, body.len() as u64);
1987 assert_eq!(part.meta.etag, format!("et{}", n));
1988 match &part.body {
1989 BodyRef::Disk { path, size, .. } => {
1990 assert_eq!(*size, body.len() as u64);
1991 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
1992 }
1993 _ => panic!("expected Disk"),
1994 }
1995 }
1996 }
1997
1998 #[test]
1999 fn mpu_abort_removes_upload() {
2000 let tmp = TempDir::new().unwrap();
2001 let store = new_store(&tmp);
2002 store
2003 .put_bucket_meta(
2004 "b",
2005 &BucketMeta {
2006 name: "b".to_string(),
2007 ..Default::default()
2008 },
2009 )
2010 .unwrap();
2011 store
2012 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2013 .unwrap();
2014 store
2015 .mpu_put_part(
2016 "b",
2017 "up",
2018 1,
2019 BodySource::Bytes(Bytes::from_static(b"x")),
2020 "e",
2021 )
2022 .unwrap();
2023 store.mpu_abort("b", "up").unwrap();
2024 assert!(!store.mpu_dir("b", "up").exists());
2025 store.mpu_abort("b", "up").unwrap();
2027 let loaded = store.load().unwrap();
2028 let snap = loaded.buckets.get("b").unwrap();
2029 assert!(snap.multipart_uploads.is_empty());
2030 }
2031
2032 #[test]
2033 fn mpu_complete_single_part_fast_path() {
2034 let tmp = TempDir::new().unwrap();
2035 let store = new_store(&tmp);
2036 store
2037 .put_bucket_meta(
2038 "b",
2039 &BucketMeta {
2040 name: "b".to_string(),
2041 ..Default::default()
2042 },
2043 )
2044 .unwrap();
2045 store
2046 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2047 .unwrap();
2048 let body = Bytes::from_static(b"only-part-bytes");
2049 store
2050 .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2051 .unwrap();
2052 let meta = sample_meta("k", body.len() as u64);
2053 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2054 match &body_ref {
2055 BodyRef::Disk { path, size, .. } => {
2056 assert_eq!(*size, body.len() as u64);
2057 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2058 }
2059 _ => panic!("expected Disk"),
2060 }
2061 assert!(!store.mpu_dir("b", "up").exists());
2062 }
2063
2064 #[test]
2065 fn mpu_complete_multi_part_concat() {
2066 let tmp = TempDir::new().unwrap();
2067 let store = new_store(&tmp);
2068 store
2069 .put_bucket_meta(
2070 "b",
2071 &BucketMeta {
2072 name: "b".to_string(),
2073 ..Default::default()
2074 },
2075 )
2076 .unwrap();
2077 store
2078 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2079 .unwrap();
2080 let p1 = Bytes::from_static(b"AAAA");
2081 let p2 = Bytes::from_static(b"BBBBBB");
2082 let p3 = Bytes::from_static(b"CC");
2083 for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2084 store
2085 .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2086 .unwrap();
2087 }
2088 let mut expected = Vec::new();
2089 expected.extend_from_slice(&p1);
2090 expected.extend_from_slice(&p2);
2091 expected.extend_from_slice(&p3);
2092 let meta = sample_meta("k", expected.len() as u64);
2093 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2094 let path = match body_ref {
2095 BodyRef::Disk { path, size, .. } => {
2096 assert_eq!(size, expected.len() as u64);
2097 path
2098 }
2099 _ => panic!("expected Disk"),
2100 };
2101 assert_eq!(std::fs::read(&path).unwrap(), expected);
2102 assert!(!store.mpu_dir("b", "up").exists());
2103 }
2104
2105 #[test]
2106 fn mpu_complete_large_streaming_via_file_source() {
2107 let tmp = TempDir::new().unwrap();
2108 let store = new_store(&tmp);
2109 store
2110 .put_bucket_meta(
2111 "b",
2112 &BucketMeta {
2113 name: "b".to_string(),
2114 ..Default::default()
2115 },
2116 )
2117 .unwrap();
2118 store
2119 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2120 .unwrap();
2121
2122 const PART_SIZE: usize = 1024 * 1024;
2125 let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2126 let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2127 for (i, byte) in patterns.iter().enumerate() {
2128 let src = tmp.path().join(format!("src-{}.bin", i + 1));
2129 let data = vec![*byte; PART_SIZE];
2130 std::fs::write(&src, &data).unwrap();
2131 expected.extend_from_slice(&data);
2132 store
2133 .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2134 .unwrap();
2135 }
2136 let meta = sample_meta("k", expected.len() as u64);
2137 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2138 let path = match body_ref {
2139 BodyRef::Disk { path, size, .. } => {
2140 assert_eq!(size, expected.len() as u64);
2141 path
2142 }
2143 _ => panic!("expected Disk"),
2144 };
2145 let actual = std::fs::read(&path).unwrap();
2146 assert_eq!(actual.len(), expected.len());
2147 assert_eq!(actual, expected);
2148 assert!(!store.mpu_dir("b", "up").exists());
2149 }
2150
2151 #[test]
2152 fn mpu_resumable_across_load() {
2153 let tmp = TempDir::new().unwrap();
2154 let store = new_store(&tmp);
2155 store
2156 .put_bucket_meta(
2157 "b",
2158 &BucketMeta {
2159 name: "b".to_string(),
2160 ..Default::default()
2161 },
2162 )
2163 .unwrap();
2164 store
2165 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2166 .unwrap();
2167 let p1 = Bytes::from_static(b"hello-");
2168 let p2 = Bytes::from_static(b"world-");
2169 store
2170 .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2171 .unwrap();
2172 store
2173 .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2174 .unwrap();
2175
2176 let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2178 let loaded = store2.load().unwrap();
2179 let snap = loaded.buckets.get("b").unwrap();
2180 let m = snap.multipart_uploads.get("up").unwrap();
2181 assert_eq!(m.parts.len(), 2);
2182 assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2183 assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2184
2185 let p3 = Bytes::from_static(b"again!");
2187 store2
2188 .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2189 .unwrap();
2190 let mut expected = Vec::new();
2191 expected.extend_from_slice(&p1);
2192 expected.extend_from_slice(&p2);
2193 expected.extend_from_slice(&p3);
2194 let meta = sample_meta("k", expected.len() as u64);
2195 let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2196 let path = match body_ref {
2197 BodyRef::Disk { path, .. } => path,
2198 _ => panic!(),
2199 };
2200 assert_eq!(std::fs::read(&path).unwrap(), expected);
2201 assert!(!store2.mpu_dir("b", "up").exists());
2202 }
2203
2204 #[test]
2205 fn tags_snapshot_roundtrip_via_store() {
2206 let tmp = TempDir::new().unwrap();
2207 let store = new_store(&tmp);
2208 store
2209 .put_bucket_meta(
2210 "b",
2211 &BucketMeta {
2212 name: "b".to_string(),
2213 ..Default::default()
2214 },
2215 )
2216 .unwrap();
2217 let mut tags = HashMap::new();
2218 tags.insert("env".to_string(), "prod".to_string());
2219 tags.insert("team".to_string(), "s3".to_string());
2220 let snap = TagsSnapshot { tags: tags.clone() };
2221 let payload = toml::to_string(&snap).unwrap();
2222 store
2223 .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2224 .unwrap();
2225 let loaded = store.load().unwrap();
2226 let text = loaded
2227 .buckets
2228 .get("b")
2229 .unwrap()
2230 .subresources
2231 .get("tags.toml")
2232 .cloned()
2233 .unwrap();
2234 let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2235 assert_eq!(decoded.tags, tags);
2236 }
2237
2238 #[test]
2239 fn acl_snapshot_roundtrip_via_store() {
2240 let tmp = TempDir::new().unwrap();
2241 let store = new_store(&tmp);
2242 store
2243 .put_bucket_meta(
2244 "b",
2245 &BucketMeta {
2246 name: "b".to_string(),
2247 ..Default::default()
2248 },
2249 )
2250 .unwrap();
2251 let snap = AclSnapshot {
2252 owner_id: "owner-abc".to_string(),
2253 grants: vec![
2254 AclGrantSnapshot {
2255 grantee_type: "CanonicalUser".to_string(),
2256 grantee_id: Some("owner-abc".to_string()),
2257 grantee_display_name: Some("owner".to_string()),
2258 grantee_uri: None,
2259 permission: "FULL_CONTROL".to_string(),
2260 },
2261 AclGrantSnapshot {
2262 grantee_type: "Group".to_string(),
2263 grantee_id: None,
2264 grantee_display_name: None,
2265 grantee_uri: Some(
2266 "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2267 ),
2268 permission: "READ".to_string(),
2269 },
2270 ],
2271 };
2272 let payload = toml::to_string(&snap).unwrap();
2273 store
2274 .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2275 .unwrap();
2276 let loaded = store.load().unwrap();
2277 let text = loaded
2278 .buckets
2279 .get("b")
2280 .unwrap()
2281 .subresources
2282 .get("acl.toml")
2283 .cloned()
2284 .unwrap();
2285 let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2286 assert_eq!(decoded.owner_id, "owner-abc");
2287 assert_eq!(decoded.grants.len(), 2);
2288 assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2289 assert_eq!(decoded.grants[1].grantee_type, "Group");
2290 }
2291
2292 #[test]
2293 fn inventory_snapshot_roundtrip_via_store() {
2294 let tmp = TempDir::new().unwrap();
2295 let store = new_store(&tmp);
2296 store
2297 .put_bucket_meta(
2298 "b",
2299 &BucketMeta {
2300 name: "b".to_string(),
2301 ..Default::default()
2302 },
2303 )
2304 .unwrap();
2305 let mut configs = HashMap::new();
2306 configs.insert(
2307 "inv-1".to_string(),
2308 "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2309 );
2310 configs.insert(
2311 "inv-2".to_string(),
2312 "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2313 );
2314 let snap = InventorySnapshot {
2315 configs: configs.clone(),
2316 };
2317 let payload = toml::to_string(&snap).unwrap();
2318 store
2319 .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2320 .unwrap();
2321 let loaded = store.load().unwrap();
2322 let text = loaded
2323 .buckets
2324 .get("b")
2325 .unwrap()
2326 .subresources
2327 .get("inventory.toml")
2328 .cloned()
2329 .unwrap();
2330 let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2331 assert_eq!(decoded.configs, configs);
2332 }
2333
2334 #[test]
2335 fn legacy_versioning_file_is_read() {
2336 let tmp = TempDir::new().unwrap();
2337 let store = new_store(&tmp);
2338 store
2340 .put_bucket_meta(
2341 "b",
2342 &BucketMeta {
2343 name: "b".to_string(),
2344 ..Default::default()
2345 },
2346 )
2347 .unwrap();
2348 let path = store.bucket_dir("b").join("versioning.toml");
2350 std::fs::write(&path, "Enabled").unwrap();
2351 let loaded = store.load().unwrap();
2352 let snap = loaded.buckets.get("b").unwrap();
2353 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2354 }
2355}