1use std::collections::{BTreeMap, HashMap};
2use std::path::PathBuf;
3
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Debug, Default, Clone, Serialize, Deserialize)]
10pub struct S3State {
11 #[serde(default)]
12 pub account_id: String,
13 #[serde(default)]
14 pub region: String,
15 #[serde(default)]
16 pub buckets: HashMap<String, BucketSnapshot>,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct BucketSnapshot {
21 pub meta: BucketMeta,
22 #[serde(default)]
23 pub objects: HashMap<String, LoadedObject>,
24 #[serde(default)]
25 pub object_versions: HashMap<String, Vec<LoadedObject>>,
26 #[serde(default)]
27 pub subresources: HashMap<String, String>,
28 #[serde(default)]
29 pub multipart_uploads: HashMap<String, LoadedMpu>,
30}
31
32#[derive(Debug, Default, Clone, Serialize, Deserialize)]
33pub struct LoadedMpu {
34 pub init: MpuInit,
35 #[serde(default)]
36 pub parts: BTreeMap<u32, LoadedPart>,
37}
38
39#[derive(Debug, Default, Clone, Serialize, Deserialize)]
40pub struct LoadedPart {
41 pub meta: UploadPartMeta,
42 #[serde(default)]
43 pub body: BodyRef,
44}
45
46#[derive(Debug, Default, Clone, Serialize, Deserialize)]
47pub struct LoadedObject {
48 pub meta: ObjectMeta,
49 #[serde(default)]
50 pub body: BodyRef,
51}
52
53#[derive(Debug, Default, Clone, Serialize, Deserialize)]
54pub struct BucketMeta {
55 #[serde(default)]
56 pub name: String,
57 #[serde(default = "default_time")]
58 pub creation_date: DateTime<Utc>,
59 #[serde(default)]
60 pub region: String,
61 #[serde(default)]
62 pub versioning: Option<String>,
63 #[serde(default)]
64 pub acl: Option<String>,
65 #[serde(default)]
66 pub acl_owner_id: String,
67 #[serde(default)]
68 pub accelerate_status: Option<String>,
69 #[serde(default)]
70 pub eventbridge_enabled: bool,
71}
72
73fn default_time() -> DateTime<Utc> {
74 DateTime::<Utc>::MIN_UTC
75}
76
77#[derive(Debug, Default, Clone, Serialize, Deserialize)]
78pub struct AclGrantSnapshot {
79 pub grantee_type: String,
80 #[serde(default)]
81 pub grantee_id: Option<String>,
82 #[serde(default)]
83 pub grantee_display_name: Option<String>,
84 #[serde(default)]
85 pub grantee_uri: Option<String>,
86 pub permission: String,
87}
88
89#[derive(Debug, Default, Clone, Serialize, Deserialize)]
90pub struct TagsSnapshot {
91 #[serde(default)]
92 pub tags: HashMap<String, String>,
93}
94
95#[derive(Debug, Default, Clone, Serialize, Deserialize)]
96pub struct AclSnapshot {
97 #[serde(default)]
98 pub owner_id: String,
99 #[serde(default)]
100 pub grants: Vec<AclGrantSnapshot>,
101}
102
103#[derive(Debug, Default, Clone, Serialize, Deserialize)]
104pub struct InventorySnapshot {
105 #[serde(default)]
106 pub configs: HashMap<String, String>,
107}
108
109#[derive(Debug, Default, Clone, Serialize, Deserialize)]
110pub struct ObjectMeta {
111 #[serde(default)]
112 pub key: String,
113 #[serde(default)]
114 pub content_type: String,
115 #[serde(default)]
116 pub etag: String,
117 #[serde(default)]
118 pub size: u64,
119 #[serde(default = "default_time")]
120 pub last_modified: DateTime<Utc>,
121 #[serde(default)]
122 pub metadata: HashMap<String, String>,
123 #[serde(default)]
124 pub tags: HashMap<String, String>,
125 #[serde(default)]
126 pub storage_class: String,
127 #[serde(default)]
128 pub acl_grants: Vec<AclGrantSnapshot>,
129 #[serde(default)]
130 pub acl_owner_id: Option<String>,
131 #[serde(default)]
132 pub parts_count: Option<u32>,
133 #[serde(default)]
134 pub part_sizes: Option<Vec<(u32, u64)>>,
135 #[serde(default)]
136 pub sse_algorithm: Option<String>,
137 #[serde(default)]
138 pub sse_kms_key_id: Option<String>,
139 #[serde(default)]
140 pub bucket_key_enabled: Option<bool>,
141 #[serde(default)]
142 pub version_id: Option<String>,
143 #[serde(default)]
144 pub is_delete_marker: bool,
145 #[serde(default)]
146 pub restore_ongoing: Option<bool>,
147 #[serde(default)]
148 pub restore_expiry: Option<String>,
149 #[serde(default)]
150 pub checksum_algorithm: Option<String>,
151 #[serde(default)]
152 pub checksum_value: Option<String>,
153 #[serde(default)]
154 pub lock_mode: Option<String>,
155 #[serde(default)]
156 pub lock_retain_until: Option<DateTime<Utc>>,
157 #[serde(default)]
158 pub lock_legal_hold: Option<String>,
159 #[serde(default)]
160 pub content_encoding: Option<String>,
161 #[serde(default)]
162 pub website_redirect_location: Option<String>,
163}
164
165#[derive(Debug, Default, Clone, Serialize, Deserialize)]
166pub struct MpuInit {
167 pub upload_id: String,
168 pub key: String,
169 #[serde(default = "default_time")]
170 pub initiated: DateTime<Utc>,
171 #[serde(default)]
172 pub metadata: HashMap<String, String>,
173 #[serde(default)]
174 pub content_type: String,
175 #[serde(default)]
176 pub storage_class: String,
177 #[serde(default)]
178 pub sse_algorithm: Option<String>,
179 #[serde(default)]
180 pub sse_kms_key_id: Option<String>,
181 #[serde(default)]
182 pub tagging: Option<String>,
183 #[serde(default)]
184 pub acl_grants: Vec<AclGrantSnapshot>,
185 #[serde(default)]
186 pub checksum_algorithm: Option<String>,
187}
188
189#[derive(Debug, Default, Clone, Serialize, Deserialize)]
190pub struct UploadPartMeta {
191 pub part_number: u32,
192 pub etag: String,
193 pub size: u64,
194 #[serde(default = "default_time")]
195 pub last_modified: DateTime<Utc>,
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub enum BucketSubresource {
200 Tags,
201 Lifecycle,
202 Cors,
203 Policy,
204 Notification,
205 Logging,
206 Website,
207 PublicAccessBlock,
208 ObjectLock,
209 Replication,
210 Ownership,
211 Inventory,
212 Encryption,
213 Versioning,
214 Acl,
215 Accelerate,
216 Analytics,
217 IntelligentTiering,
218 Metrics,
219 RequestPayment,
220 Abac,
221 MetadataConfiguration,
222 MetadataTableConfiguration,
223}
224
225pub const ALL_SUBRESOURCES: &[BucketSubresource] = &[
226 BucketSubresource::Tags,
227 BucketSubresource::Lifecycle,
228 BucketSubresource::Cors,
229 BucketSubresource::Policy,
230 BucketSubresource::Notification,
231 BucketSubresource::Logging,
232 BucketSubresource::Website,
233 BucketSubresource::PublicAccessBlock,
234 BucketSubresource::ObjectLock,
235 BucketSubresource::Replication,
236 BucketSubresource::Ownership,
237 BucketSubresource::Inventory,
238 BucketSubresource::Encryption,
239 BucketSubresource::Versioning,
240 BucketSubresource::Acl,
241 BucketSubresource::Accelerate,
242 BucketSubresource::Analytics,
243 BucketSubresource::IntelligentTiering,
244 BucketSubresource::Metrics,
245 BucketSubresource::RequestPayment,
246 BucketSubresource::Abac,
247 BucketSubresource::MetadataConfiguration,
248 BucketSubresource::MetadataTableConfiguration,
249];
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub enum BodyRef {
253 #[serde(skip)]
254 Memory(Bytes),
255 Disk {
256 bucket: String,
257 key: String,
258 #[serde(default)]
259 version: Option<String>,
260 path: PathBuf,
261 size: u64,
262 },
263}
264
265impl BodyRef {
266 pub fn size(&self) -> u64 {
267 match self {
268 BodyRef::Memory(b) => b.len() as u64,
269 BodyRef::Disk { size, .. } => *size,
270 }
271 }
272}
273
274impl Default for BodyRef {
275 fn default() -> Self {
276 BodyRef::Memory(Bytes::new())
277 }
278}
279
280#[derive(Debug)]
281pub enum BodySource {
282 Bytes(Bytes),
283 File(PathBuf),
286 FileCopy(PathBuf),
290}
291
292#[derive(Debug, Error)]
293pub enum StoreError {
294 #[error("io error: {0}")]
295 Io(#[from] std::io::Error),
296 #[error("serialization error: {0}")]
297 Serde(String),
298 #[error("not supported by this store")]
299 NotSupported,
300 #[error("{0}")]
301 Other(String),
302}
303
304pub type StoreResult<T> = Result<T, StoreError>;
305
306pub trait S3Store: Send + Sync {
307 fn load(&self) -> StoreResult<S3State>;
308
309 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()>;
310 fn put_bucket_subresource(
311 &self,
312 bucket: &str,
313 kind: BucketSubresource,
314 payload: &str,
315 ) -> StoreResult<()>;
316 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()>;
317 fn delete_bucket(&self, bucket: &str) -> StoreResult<()>;
318
319 fn put_object(
320 &self,
321 bucket: &str,
322 key: &str,
323 version: Option<&str>,
324 body: BodySource,
325 meta: &ObjectMeta,
326 ) -> StoreResult<BodyRef>;
327 fn put_object_meta(
328 &self,
329 bucket: &str,
330 key: &str,
331 version: Option<&str>,
332 meta: &ObjectMeta,
333 ) -> StoreResult<()>;
334 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()>;
335 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes>;
336
337 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()>;
338 fn mpu_put_part(
339 &self,
340 bucket: &str,
341 upload_id: &str,
342 part_number: u32,
343 body: BodySource,
344 etag: &str,
345 ) -> StoreResult<()>;
346 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()>;
347 fn mpu_complete(
348 &self,
349 bucket: &str,
350 upload_id: &str,
351 final_key: &str,
352 version: Option<&str>,
353 meta: &ObjectMeta,
354 ) -> StoreResult<BodyRef>;
355}
356
357pub struct MemoryS3Store;
358
359impl MemoryS3Store {
360 pub fn new() -> Self {
361 Self
362 }
363}
364
365impl Default for MemoryS3Store {
366 fn default() -> Self {
367 Self::new()
368 }
369}
370
371impl S3Store for MemoryS3Store {
372 fn load(&self) -> StoreResult<S3State> {
373 Ok(S3State::default())
374 }
375
376 fn put_bucket_meta(&self, _bucket: &str, _meta: &BucketMeta) -> StoreResult<()> {
377 Ok(())
378 }
379 fn put_bucket_subresource(
380 &self,
381 _bucket: &str,
382 _kind: BucketSubresource,
383 _payload: &str,
384 ) -> StoreResult<()> {
385 Ok(())
386 }
387 fn delete_bucket_subresource(
388 &self,
389 _bucket: &str,
390 _kind: BucketSubresource,
391 ) -> StoreResult<()> {
392 Ok(())
393 }
394 fn delete_bucket(&self, _bucket: &str) -> StoreResult<()> {
395 Ok(())
396 }
397
398 fn put_object(
399 &self,
400 _bucket: &str,
401 _key: &str,
402 _version: Option<&str>,
403 body: BodySource,
404 _meta: &ObjectMeta,
405 ) -> StoreResult<BodyRef> {
406 match body {
407 BodySource::Bytes(b) => Ok(BodyRef::Memory(b)),
408 BodySource::File(_) | BodySource::FileCopy(_) => Err(StoreError::Other(
409 "file-backed put not supported in memory mode".to_string(),
410 )),
411 }
412 }
413 fn put_object_meta(
414 &self,
415 _bucket: &str,
416 _key: &str,
417 _version: Option<&str>,
418 _meta: &ObjectMeta,
419 ) -> StoreResult<()> {
420 Ok(())
421 }
422 fn delete_object(&self, _bucket: &str, _key: &str, _version: Option<&str>) -> StoreResult<()> {
423 Ok(())
424 }
425 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
426 match body {
427 BodyRef::Memory(b) => Ok(b.clone()),
428 BodyRef::Disk { .. } => {
429 panic!("MemoryS3Store cannot open Disk-backed BodyRef")
430 }
431 }
432 }
433
434 fn mpu_create(&self, _bucket: &str, _upload_id: &str, _init: &MpuInit) -> StoreResult<()> {
435 Ok(())
436 }
437 fn mpu_put_part(
438 &self,
439 _bucket: &str,
440 _upload_id: &str,
441 _part_number: u32,
442 _body: BodySource,
443 _etag: &str,
444 ) -> StoreResult<()> {
445 Ok(())
446 }
447 fn mpu_abort(&self, _bucket: &str, _upload_id: &str) -> StoreResult<()> {
448 Ok(())
449 }
450 fn mpu_complete(
451 &self,
452 _bucket: &str,
453 _upload_id: &str,
454 _final_key: &str,
455 _version: Option<&str>,
456 _meta: &ObjectMeta,
457 ) -> StoreResult<BodyRef> {
458 Ok(BodyRef::Memory(Bytes::new()))
459 }
460}
461
462pub struct DiskS3Store {
463 root: PathBuf,
464 cache: std::sync::Arc<crate::cache::BodyCache>,
465}
466
467impl DiskS3Store {
468 pub fn new(root: PathBuf, cache: std::sync::Arc<crate::cache::BodyCache>) -> Self {
469 Self { root, cache }
470 }
471
472 fn buckets_dir(&self) -> PathBuf {
473 self.root.join("buckets")
474 }
475
476 fn bucket_dir(&self, bucket: &str) -> PathBuf {
477 self.buckets_dir()
478 .join(crate::key_escape::escape_key_segment(bucket))
479 }
480
481 fn object_dir(&self, bucket: &str, key: &str) -> PathBuf {
482 self.bucket_dir(bucket)
483 .join("objects")
484 .join(crate::key_escape::escape_key_segment(key))
485 }
486
487 fn version_tag(version: Option<&str>) -> String {
488 version.unwrap_or("null").to_string()
489 }
490
491 fn object_paths(
492 &self,
493 bucket: &str,
494 key: &str,
495 version: Option<&str>,
496 ) -> (PathBuf, PathBuf, PathBuf) {
497 let dir = self.object_dir(bucket, key);
498 let tag = Self::version_tag(version);
499 let bin = dir.join(format!("{}.bin", tag));
500 let toml = dir.join(format!("{}.toml", tag));
501 (dir, bin, toml)
502 }
503
504 fn subresource_filename(kind: BucketSubresource) -> &'static str {
505 match kind {
506 BucketSubresource::Tags => "tags.toml",
507 BucketSubresource::Lifecycle => "lifecycle.toml",
508 BucketSubresource::Cors => "cors.toml",
509 BucketSubresource::Policy => "policy.toml",
510 BucketSubresource::Notification => "notification.toml",
511 BucketSubresource::Logging => "logging.toml",
512 BucketSubresource::Website => "website.toml",
513 BucketSubresource::PublicAccessBlock => "public_access_block.toml",
514 BucketSubresource::ObjectLock => "object_lock.toml",
515 BucketSubresource::Replication => "replication.toml",
516 BucketSubresource::Ownership => "ownership.toml",
517 BucketSubresource::Inventory => "inventory.toml",
518 BucketSubresource::Encryption => "encryption.toml",
519 BucketSubresource::Versioning => "versioning.toml",
520 BucketSubresource::Acl => "acl.toml",
521 BucketSubresource::Accelerate => "accelerate.toml",
522 BucketSubresource::Analytics => "analytics.toml",
523 BucketSubresource::IntelligentTiering => "intelligent_tiering.toml",
524 BucketSubresource::Metrics => "metrics.toml",
525 BucketSubresource::RequestPayment => "request_payment.toml",
526 BucketSubresource::Abac => "abac.toml",
527 BucketSubresource::MetadataConfiguration => "metadata_configuration.toml",
528 BucketSubresource::MetadataTableConfiguration => "metadata_table_configuration.toml",
529 }
530 }
531
532 fn cleanup_empty(dir: &std::path::Path) {
533 let _ = std::fs::remove_dir(dir);
534 }
535
536 fn mpu_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
537 self.bucket_dir(bucket)
538 .join("mpu")
539 .join(crate::key_escape::escape_key_segment(upload_id))
540 }
541
542 fn mpu_parts_dir(&self, bucket: &str, upload_id: &str) -> PathBuf {
543 self.mpu_dir(bucket, upload_id).join("parts")
544 }
545
546 fn mpu_part_bin(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
547 self.mpu_parts_dir(bucket, upload_id)
548 .join(format!("{}.bin", part_number))
549 }
550
551 fn mpu_part_toml(&self, bucket: &str, upload_id: &str, part_number: u32) -> PathBuf {
552 self.mpu_parts_dir(bucket, upload_id)
553 .join(format!("{}.toml", part_number))
554 }
555
556 fn mpu_body_key(bucket: &str, upload_id: &str, part_number: u32) -> crate::cache::BodyKey {
557 crate::cache::BodyKey::new(
558 bucket.to_string(),
559 format!("__mpu__/{}", upload_id),
560 Some(format!("part-{}", part_number)),
561 )
562 }
563}
564
565fn io_other(msg: impl Into<String>) -> StoreError {
566 StoreError::Other(msg.into())
567}
568
569impl S3Store for DiskS3Store {
570 fn load(&self) -> StoreResult<S3State> {
571 let mut state = S3State::default();
572 let buckets_dir = self.buckets_dir();
573 if !buckets_dir.exists() {
574 return Ok(state);
575 }
576 for entry in std::fs::read_dir(&buckets_dir)? {
577 let entry = entry?;
578 if !entry.file_type()?.is_dir() {
579 continue;
580 }
581 let bdir = entry.path();
582 let meta_path = bdir.join("meta.toml");
583 if !meta_path.exists() {
584 continue;
585 }
586 let meta_text = std::fs::read_to_string(&meta_path)?;
587 let mut meta: BucketMeta =
588 toml::from_str(&meta_text).map_err(|e| StoreError::Serde(e.to_string()))?;
589 let mut snap = BucketSnapshot {
590 meta: meta.clone(),
591 objects: HashMap::new(),
592 object_versions: HashMap::new(),
593 subresources: HashMap::new(),
594 multipart_uploads: HashMap::new(),
595 };
596
597 for kind in ALL_SUBRESOURCES {
598 let fname = Self::subresource_filename(*kind);
599 let path = bdir.join(fname);
600 if path.exists() {
601 let text = std::fs::read_to_string(&path)?;
602 if *kind == BucketSubresource::Versioning && snap.meta.versioning.is_none() {
603 let stripped = text.trim();
604 if !stripped.is_empty() {
605 snap.meta.versioning = Some(stripped.to_string());
606 meta.versioning = snap.meta.versioning.clone();
607 }
608 }
609 snap.subresources.insert(fname.to_string(), text);
610 }
611 }
612
613 let objects_root = bdir.join("objects");
614 if objects_root.exists() {
615 for okey_entry in std::fs::read_dir(&objects_root)? {
616 let okey_entry = okey_entry?;
617 if !okey_entry.file_type()?.is_dir() {
618 continue;
619 }
620 let key_dir = okey_entry.path();
621 let mut versioned: Vec<LoadedObject> = Vec::new();
622 let mut key_name: Option<String> = None;
623 for version_entry in std::fs::read_dir(&key_dir)? {
624 let version_entry = version_entry?;
625 let path = version_entry.path();
626 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
627 continue;
628 };
629 if !fname.ends_with(".toml") {
630 continue;
631 }
632 let version_tag = &fname[..fname.len() - 5];
633 let toml_text = std::fs::read_to_string(&path)?;
634 let obj_meta: ObjectMeta = toml::from_str(&toml_text)
635 .map_err(|e| StoreError::Serde(e.to_string()))?;
636 let bin_path = key_dir.join(format!("{}.bin", version_tag));
637 let (body, size) = if obj_meta.is_delete_marker {
638 (BodyRef::Memory(Bytes::new()), 0u64)
639 } else if bin_path.exists() {
640 let sz = std::fs::metadata(&bin_path)?.len();
641 (
642 BodyRef::Disk {
643 bucket: meta.name.clone(),
644 key: obj_meta.key.clone(),
645 version: if version_tag == "null" {
646 None
647 } else {
648 Some(version_tag.to_string())
649 },
650 path: bin_path,
651 size: sz,
652 },
653 sz,
654 )
655 } else {
656 return Err(StoreError::Other(format!(
661 "missing body file: {}",
662 bin_path.display()
663 )));
664 };
665 let _ = size;
666 key_name.get_or_insert_with(|| obj_meta.key.clone());
667 if version_tag == "null" && obj_meta.version_id.is_none() {
668 snap.objects.insert(
669 obj_meta.key.clone(),
670 LoadedObject {
671 meta: obj_meta,
672 body,
673 },
674 );
675 } else {
676 versioned.push(LoadedObject {
677 meta: obj_meta,
678 body,
679 });
680 }
681 }
682 if !versioned.is_empty() {
683 versioned.sort_by_key(|v| v.meta.last_modified);
684 if let Some(key) = key_name {
685 match versioned.last() {
692 Some(newest) if newest.meta.is_delete_marker => {
693 snap.objects.remove(&key);
694 }
695 Some(newest) => {
696 snap.objects.insert(key.clone(), newest.clone());
697 }
698 None => {}
699 }
700 snap.object_versions.insert(key, versioned);
701 }
702 }
703 }
704 }
705
706 let mpu_root = bdir.join("mpu");
707 if mpu_root.exists() {
708 for upload_entry in std::fs::read_dir(&mpu_root)? {
709 let upload_entry = upload_entry?;
710 if !upload_entry.file_type()?.is_dir() {
711 continue;
712 }
713 let upload_dir = upload_entry.path();
714 let init_path = upload_dir.join("init.toml");
715 if !init_path.exists() {
716 continue;
717 }
718 let init_text = std::fs::read_to_string(&init_path)?;
719 let init: MpuInit =
720 toml::from_str(&init_text).map_err(|e| StoreError::Serde(e.to_string()))?;
721 let mut loaded_parts: BTreeMap<u32, LoadedPart> = BTreeMap::new();
722 let parts_dir = upload_dir.join("parts");
723 if parts_dir.exists() {
724 for part_entry in std::fs::read_dir(&parts_dir)? {
725 let part_entry = part_entry?;
726 let path = part_entry.path();
727 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
728 continue;
729 };
730 if !fname.ends_with(".toml") {
731 continue;
732 }
733 let stem = &fname[..fname.len() - 5];
734 let Ok(part_number) = stem.parse::<u32>() else {
735 continue;
736 };
737 let toml_text = std::fs::read_to_string(&path)?;
738 let part_meta: UploadPartMeta = toml::from_str(&toml_text)
739 .map_err(|e| StoreError::Serde(e.to_string()))?;
740 let bin_path = parts_dir.join(format!("{}.bin", part_number));
741 if !bin_path.exists() {
742 return Err(StoreError::Other(format!(
743 "missing multipart part body file: {}",
744 bin_path.display()
745 )));
746 }
747 let sz = std::fs::metadata(&bin_path)?.len();
748 let body = BodyRef::Disk {
749 bucket: meta.name.clone(),
750 key: format!("__mpu__/{}", init.upload_id),
751 version: Some(format!("part-{}", part_number)),
752 path: bin_path,
753 size: sz,
754 };
755 loaded_parts.insert(
756 part_number,
757 LoadedPart {
758 meta: part_meta,
759 body,
760 },
761 );
762 }
763 }
764 snap.multipart_uploads.insert(
765 init.upload_id.clone(),
766 LoadedMpu {
767 init,
768 parts: loaded_parts,
769 },
770 );
771 }
772 }
773
774 state.buckets.insert(meta.name.clone(), snap);
775 }
776 Ok(state)
777 }
778
779 fn put_bucket_meta(&self, bucket: &str, meta: &BucketMeta) -> StoreResult<()> {
780 let dir = self.bucket_dir(bucket);
781 std::fs::create_dir_all(&dir)?;
782 crate::atomic::write_atomic_toml(&dir.join("meta.toml"), meta)?;
783 Ok(())
784 }
785
786 fn put_bucket_subresource(
787 &self,
788 bucket: &str,
789 kind: BucketSubresource,
790 payload: &str,
791 ) -> StoreResult<()> {
792 let dir = self.bucket_dir(bucket);
793 std::fs::create_dir_all(&dir)?;
794 let path = dir.join(Self::subresource_filename(kind));
795 crate::atomic::write_atomic_bytes(&path, payload.as_bytes())?;
796 Ok(())
797 }
798
799 fn delete_bucket_subresource(&self, bucket: &str, kind: BucketSubresource) -> StoreResult<()> {
800 let path = self
801 .bucket_dir(bucket)
802 .join(Self::subresource_filename(kind));
803 match std::fs::remove_file(&path) {
804 Ok(_) => Ok(()),
805 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
806 Err(e) => Err(e.into()),
807 }
808 }
809
810 fn delete_bucket(&self, bucket: &str) -> StoreResult<()> {
811 let dir = self.bucket_dir(bucket);
812 match std::fs::remove_dir_all(&dir) {
813 Ok(_) => Ok(()),
814 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
815 Err(e) => Err(e.into()),
816 }
817 }
818
819 fn put_object(
820 &self,
821 bucket: &str,
822 key: &str,
823 version: Option<&str>,
824 body: BodySource,
825 meta: &ObjectMeta,
826 ) -> StoreResult<BodyRef> {
827 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
828 std::fs::create_dir_all(&dir)?;
829
830 if meta.is_delete_marker {
831 crate::atomic::write_atomic_toml(&toml_path, meta)?;
832 return Ok(BodyRef::Memory(Bytes::new()));
833 }
834
835 let size: u64;
836 let bytes_for_cache: Option<Bytes>;
837 match body {
838 BodySource::Bytes(b) => {
839 size = b.len() as u64;
840 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
841 bytes_for_cache = Some(b);
842 }
843 BodySource::File(src) => {
844 let src_size = std::fs::metadata(&src)?.len();
845 size = src_size;
846 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
847 bytes_for_cache = None;
848 }
849 BodySource::FileCopy(src) => {
850 let src_size = std::fs::metadata(&src)?.len();
851 size = src_size;
852 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
853 bytes_for_cache = None;
854 }
855 }
856
857 crate::atomic::write_atomic_toml(&toml_path, meta)?;
858
859 let body_key = crate::cache::BodyKey::new(
860 bucket.to_string(),
861 key.to_string(),
862 version.map(|s| s.to_string()),
863 );
864 if let Some(b) = bytes_for_cache {
865 self.cache.insert(body_key, b);
866 } else {
867 self.cache.invalidate(&crate::cache::BodyKey::new(
868 bucket.to_string(),
869 key.to_string(),
870 version.map(|s| s.to_string()),
871 ));
872 }
873
874 Ok(BodyRef::Disk {
875 bucket: bucket.to_string(),
876 key: key.to_string(),
877 version: version.map(|s| s.to_string()),
878 path: bin_path,
879 size,
880 })
881 }
882
883 fn put_object_meta(
884 &self,
885 bucket: &str,
886 key: &str,
887 version: Option<&str>,
888 meta: &ObjectMeta,
889 ) -> StoreResult<()> {
890 let (dir, _bin, toml_path) = self.object_paths(bucket, key, version);
891 std::fs::create_dir_all(&dir)?;
892 crate::atomic::write_atomic_toml(&toml_path, meta)?;
893 Ok(())
894 }
895
896 fn delete_object(&self, bucket: &str, key: &str, version: Option<&str>) -> StoreResult<()> {
897 let (dir, bin_path, toml_path) = self.object_paths(bucket, key, version);
898 for p in [&bin_path, &toml_path] {
899 match std::fs::remove_file(p) {
900 Ok(_) => {}
901 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
902 Err(e) => return Err(e.into()),
903 }
904 }
905 Self::cleanup_empty(&dir);
906
907 self.cache.invalidate(&crate::cache::BodyKey::new(
908 bucket.to_string(),
909 key.to_string(),
910 version.map(|s| s.to_string()),
911 ));
912 Ok(())
913 }
914
915 fn open_object_body(&self, body: &BodyRef) -> StoreResult<Bytes> {
916 match body {
917 BodyRef::Memory(b) => Ok(b.clone()),
918 BodyRef::Disk {
919 bucket,
920 key,
921 version,
922 path,
923 size: _,
924 } => {
925 let body_key =
926 crate::cache::BodyKey::new(bucket.clone(), key.clone(), version.clone());
927 if let Some(bytes) = self.cache.get(&body_key) {
928 return Ok(bytes);
929 }
930 let bytes = Bytes::from(std::fs::read(path)?);
931 self.cache.insert(body_key, bytes.clone());
932 Ok(bytes)
933 }
934 }
935 }
936
937 fn mpu_create(&self, bucket: &str, upload_id: &str, init: &MpuInit) -> StoreResult<()> {
938 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
939 std::fs::create_dir_all(&parts_dir)?;
940 let init_path = self.mpu_dir(bucket, upload_id).join("init.toml");
941 crate::atomic::write_atomic_toml(&init_path, init)?;
942 Ok(())
943 }
944
945 fn mpu_put_part(
946 &self,
947 bucket: &str,
948 upload_id: &str,
949 part_number: u32,
950 body: BodySource,
951 etag: &str,
952 ) -> StoreResult<()> {
953 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
954 std::fs::create_dir_all(&parts_dir)?;
955 let bin_path = self.mpu_part_bin(bucket, upload_id, part_number);
956 let toml_path = self.mpu_part_toml(bucket, upload_id, part_number);
957
958 let size: u64 = match body {
959 BodySource::Bytes(b) => {
960 let n = b.len() as u64;
961 crate::atomic::write_atomic_bytes(&bin_path, &b)?;
962 let cache_key = Self::mpu_body_key(bucket, upload_id, part_number);
963 self.cache.insert(cache_key, b);
964 n
965 }
966 BodySource::File(src) => {
967 let n = std::fs::metadata(&src)?.len();
968 crate::atomic::write_atomic_from_file(&src, &bin_path)?;
969 self.cache
970 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
971 n
972 }
973 BodySource::FileCopy(src) => {
974 let n = std::fs::metadata(&src)?.len();
975 crate::atomic::write_atomic_copy_from_file(&src, &bin_path)?;
976 self.cache
977 .invalidate(&Self::mpu_body_key(bucket, upload_id, part_number));
978 n
979 }
980 };
981
982 let meta = UploadPartMeta {
983 part_number,
984 etag: etag.to_string(),
985 size,
986 last_modified: Utc::now(),
987 };
988 crate::atomic::write_atomic_toml(&toml_path, &meta)?;
989 Ok(())
990 }
991
992 fn mpu_abort(&self, bucket: &str, upload_id: &str) -> StoreResult<()> {
993 let dir = self.mpu_dir(bucket, upload_id);
994 if let Ok(entries) = std::fs::read_dir(self.mpu_parts_dir(bucket, upload_id)) {
996 for entry in entries.flatten() {
997 let path = entry.path();
998 if let Some(fname) = path.file_name().and_then(|s| s.to_str()) {
999 if let Some(stem) = fname.strip_suffix(".bin") {
1000 if let Ok(n) = stem.parse::<u32>() {
1001 self.cache
1002 .invalidate(&Self::mpu_body_key(bucket, upload_id, n));
1003 }
1004 }
1005 }
1006 }
1007 }
1008 match std::fs::remove_dir_all(&dir) {
1009 Ok(_) => Ok(()),
1010 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1011 Err(e) => Err(e.into()),
1012 }
1013 }
1014
1015 fn mpu_complete(
1016 &self,
1017 bucket: &str,
1018 upload_id: &str,
1019 final_key: &str,
1020 version: Option<&str>,
1021 meta: &ObjectMeta,
1022 ) -> StoreResult<BodyRef> {
1023 let parts_dir = self.mpu_parts_dir(bucket, upload_id);
1024 if !parts_dir.exists() {
1025 return Err(io_other(format!(
1026 "mpu_complete: no parts dir for upload {}",
1027 upload_id
1028 )));
1029 }
1030
1031 let mut part_numbers: Vec<u32> = Vec::new();
1033 for entry in std::fs::read_dir(&parts_dir)? {
1034 let entry = entry?;
1035 let path = entry.path();
1036 let Some(fname) = path.file_name().and_then(|s| s.to_str()) else {
1037 continue;
1038 };
1039 if let Some(stem) = fname.strip_suffix(".toml") {
1040 if let Ok(n) = stem.parse::<u32>() {
1041 if self.mpu_part_bin(bucket, upload_id, n).exists() {
1042 part_numbers.push(n);
1043 }
1044 }
1045 }
1046 }
1047 part_numbers.sort_unstable();
1048 if part_numbers.is_empty() {
1049 return Err(io_other(format!(
1050 "mpu_complete: upload {} has no parts",
1051 upload_id
1052 )));
1053 }
1054
1055 let (dir, bin_path, toml_path) = self.object_paths(bucket, final_key, version);
1056 std::fs::create_dir_all(&dir)?;
1057
1058 let total_size: u64 = if part_numbers.len() == 1 {
1059 let only = self.mpu_part_bin(bucket, upload_id, part_numbers[0]);
1060 let sz = std::fs::metadata(&only)?.len();
1061 match std::fs::rename(&only, &bin_path) {
1062 Ok(_) => {}
1063 Err(e) if e.raw_os_error() == Some(libc_exdev()) => {
1064 {
1066 let mut input = std::fs::File::open(&only)?;
1067 let tmp = {
1068 let mut os = bin_path.as_os_str().to_owned();
1069 os.push(".tmp");
1070 PathBuf::from(os)
1071 };
1072 let mut out = std::fs::OpenOptions::new()
1073 .write(true)
1074 .create(true)
1075 .truncate(true)
1076 .open(&tmp)?;
1077 std::io::copy(&mut input, &mut out)?;
1078 out.sync_all()?;
1079 std::fs::rename(&tmp, &bin_path)?;
1080 }
1081 let _ = std::fs::remove_file(&only);
1082 }
1083 Err(e) => return Err(e.into()),
1084 }
1085 sz
1086 } else {
1087 let tmp = {
1088 let mut os = bin_path.as_os_str().to_owned();
1089 os.push(".tmp");
1090 PathBuf::from(os)
1091 };
1092 let mut total: u64 = 0;
1093 {
1094 let mut out = std::fs::OpenOptions::new()
1095 .write(true)
1096 .create(true)
1097 .truncate(true)
1098 .open(&tmp)?;
1099 for n in &part_numbers {
1100 let part_path = self.mpu_part_bin(bucket, upload_id, *n);
1101 let mut input = std::fs::File::open(&part_path)?;
1102 let copied = std::io::copy(&mut input, &mut out)?;
1103 total += copied;
1104 }
1105 out.sync_all()?;
1106 }
1107 std::fs::rename(&tmp, &bin_path)?;
1108 if let Some(parent) = bin_path.parent() {
1109 if let Ok(dir_handle) = std::fs::File::open(parent) {
1110 let _ = dir_handle.sync_all();
1111 }
1112 }
1113 total
1114 };
1115
1116 crate::atomic::write_atomic_toml(&toml_path, meta)?;
1117
1118 for n in &part_numbers {
1120 self.cache
1121 .invalidate(&Self::mpu_body_key(bucket, upload_id, *n));
1122 }
1123 let mpu_dir = self.mpu_dir(bucket, upload_id);
1124 let _ = std::fs::remove_dir_all(&mpu_dir);
1125
1126 self.cache.invalidate(&crate::cache::BodyKey::new(
1131 bucket.to_string(),
1132 final_key.to_string(),
1133 version.map(|s| s.to_string()),
1134 ));
1135
1136 Ok(BodyRef::Disk {
1137 bucket: bucket.to_string(),
1138 key: final_key.to_string(),
1139 version: version.map(|s| s.to_string()),
1140 path: bin_path,
1141 size: total_size,
1142 })
1143 }
1144}
1145
1146fn libc_exdev() -> i32 {
1147 18
1148}
1149
1150#[cfg(test)]
1151mod disk_tests {
1152 use super::*;
1153 use std::sync::Arc;
1154 use tempfile::TempDir;
1155
1156 fn new_store(tmp: &TempDir) -> DiskS3Store {
1157 let cache = Arc::new(crate::cache::BodyCache::new(1024 * 1024));
1158 DiskS3Store::new(tmp.path().to_path_buf(), cache)
1159 }
1160
1161 fn new_store_with_cache(
1162 tmp: &TempDir,
1163 cap: u64,
1164 ) -> (DiskS3Store, Arc<crate::cache::BodyCache>) {
1165 let cache = Arc::new(crate::cache::BodyCache::new(cap));
1166 (
1167 DiskS3Store::new(tmp.path().to_path_buf(), cache.clone()),
1168 cache,
1169 )
1170 }
1171
1172 fn sample_meta(key: &str, size: u64) -> ObjectMeta {
1173 ObjectMeta {
1174 key: key.to_string(),
1175 content_type: "application/octet-stream".to_string(),
1176 etag: "etag".to_string(),
1177 size,
1178 ..Default::default()
1179 }
1180 }
1181
1182 #[test]
1183 fn put_bucket_meta_roundtrip() {
1184 let tmp = TempDir::new().unwrap();
1185 let store = new_store(&tmp);
1186 let meta = BucketMeta {
1187 name: "b1".to_string(),
1188 region: "us-east-1".to_string(),
1189 versioning: Some("Enabled".to_string()),
1190 ..Default::default()
1191 };
1192 store.put_bucket_meta("b1", &meta).unwrap();
1193 let loaded = store.load().unwrap();
1194 let snap = loaded.buckets.get("b1").unwrap();
1195 assert_eq!(snap.meta.name, "b1");
1196 assert_eq!(snap.meta.region, "us-east-1");
1197 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
1198 }
1199
1200 #[test]
1201 fn put_bucket_subresource_each_variant_writes_file() {
1202 let tmp = TempDir::new().unwrap();
1203 let store = new_store(&tmp);
1204 store
1205 .put_bucket_meta(
1206 "b",
1207 &BucketMeta {
1208 name: "b".to_string(),
1209 ..Default::default()
1210 },
1211 )
1212 .unwrap();
1213 let variants = [
1214 BucketSubresource::Tags,
1215 BucketSubresource::Lifecycle,
1216 BucketSubresource::Cors,
1217 BucketSubresource::Policy,
1218 BucketSubresource::Notification,
1219 BucketSubresource::Logging,
1220 BucketSubresource::Website,
1221 BucketSubresource::PublicAccessBlock,
1222 BucketSubresource::ObjectLock,
1223 BucketSubresource::Replication,
1224 BucketSubresource::Ownership,
1225 BucketSubresource::Inventory,
1226 BucketSubresource::Encryption,
1227 BucketSubresource::Versioning,
1228 BucketSubresource::Acl,
1229 BucketSubresource::Accelerate,
1230 ];
1231 for v in variants {
1232 store
1233 .put_bucket_subresource("b", v, "payload=true")
1234 .unwrap();
1235 let file = store
1236 .bucket_dir("b")
1237 .join(DiskS3Store::subresource_filename(v));
1238 assert!(file.exists(), "{:?}", v);
1239 assert_eq!(std::fs::read_to_string(&file).unwrap(), "payload=true");
1240 }
1241 }
1242
1243 #[test]
1244 fn delete_bucket_subresource_removes_file() {
1245 let tmp = TempDir::new().unwrap();
1246 let store = new_store(&tmp);
1247 store
1248 .put_bucket_meta(
1249 "b",
1250 &BucketMeta {
1251 name: "b".to_string(),
1252 ..Default::default()
1253 },
1254 )
1255 .unwrap();
1256 store
1257 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1258 .unwrap();
1259 store
1260 .delete_bucket_subresource("b", BucketSubresource::Tags)
1261 .unwrap();
1262 let file = store.bucket_dir("b").join("tags.toml");
1263 assert!(!file.exists());
1264 store
1266 .delete_bucket_subresource("b", BucketSubresource::Tags)
1267 .unwrap();
1268 }
1269
1270 #[test]
1271 fn put_object_bytes_roundtrip() {
1272 let tmp = TempDir::new().unwrap();
1273 let store = new_store(&tmp);
1274 store
1275 .put_bucket_meta(
1276 "b",
1277 &BucketMeta {
1278 name: "b".to_string(),
1279 ..Default::default()
1280 },
1281 )
1282 .unwrap();
1283 let data = Bytes::from_static(b"hello world");
1284 let meta = sample_meta("k1", data.len() as u64);
1285 let body_ref = store
1286 .put_object("b", "k1", None, BodySource::Bytes(data.clone()), &meta)
1287 .unwrap();
1288 match &body_ref {
1289 BodyRef::Disk {
1290 bucket,
1291 key,
1292 size,
1293 path,
1294 ..
1295 } => {
1296 assert_eq!(bucket, "b");
1297 assert_eq!(key, "k1");
1298 assert_eq!(*size, data.len() as u64);
1299 assert_eq!(std::fs::read(path).unwrap(), data.to_vec());
1300 }
1301 _ => panic!("expected Disk"),
1302 }
1303 let loaded = store.load().unwrap();
1304 let snap = loaded.buckets.get("b").unwrap();
1305 let obj = snap.objects.get("k1").unwrap();
1306 assert_eq!(obj.meta.size, data.len() as u64);
1307 }
1308
1309 #[test]
1310 fn put_object_file_copy_source_leaves_src_in_place() {
1311 let tmp = TempDir::new().unwrap();
1312 let store = new_store(&tmp);
1313 store
1314 .put_bucket_meta(
1315 "b",
1316 &BucketMeta {
1317 name: "b".to_string(),
1318 ..Default::default()
1319 },
1320 )
1321 .unwrap();
1322 let src_dir = TempDir::new().unwrap();
1323 let src = src_dir.path().join("src.bin");
1324 std::fs::write(&src, b"copied-body").unwrap();
1325 let meta = sample_meta("k", 11);
1326 let bref = store
1327 .put_object("b", "k", None, BodySource::FileCopy(src.clone()), &meta)
1328 .unwrap();
1329 match bref {
1330 BodyRef::Disk { path, size, .. } => {
1331 assert_eq!(size, 11);
1332 assert_eq!(std::fs::read(&path).unwrap(), b"copied-body");
1333 }
1334 _ => panic!("expected Disk bodyref"),
1335 }
1336 assert!(src.exists(), "source file must not be moved by FileCopy");
1337 assert_eq!(std::fs::read(&src).unwrap(), b"copied-body");
1338 }
1339
1340 #[test]
1341 fn put_object_file_source() {
1342 let tmp = TempDir::new().unwrap();
1343 let store = new_store(&tmp);
1344 store
1345 .put_bucket_meta(
1346 "b",
1347 &BucketMeta {
1348 name: "b".to_string(),
1349 ..Default::default()
1350 },
1351 )
1352 .unwrap();
1353 let src = tmp.path().join("src.bin");
1354 std::fs::write(&src, b"file-body").unwrap();
1355 let meta = sample_meta("k", 9);
1356 let body_ref = store
1357 .put_object("b", "k", None, BodySource::File(src.clone()), &meta)
1358 .unwrap();
1359 let path = match body_ref {
1360 BodyRef::Disk { path, .. } => path,
1361 _ => panic!(),
1362 };
1363 assert_eq!(std::fs::read(&path).unwrap(), b"file-body");
1364 }
1365
1366 #[test]
1367 fn put_object_meta_only_keeps_bin() {
1368 let tmp = TempDir::new().unwrap();
1369 let store = new_store(&tmp);
1370 store
1371 .put_bucket_meta(
1372 "b",
1373 &BucketMeta {
1374 name: "b".to_string(),
1375 ..Default::default()
1376 },
1377 )
1378 .unwrap();
1379 let data = Bytes::from_static(b"abc");
1380 let mut meta = sample_meta("k", 3);
1381 store
1382 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1383 .unwrap();
1384 let (_, bin, _) = store.object_paths("b", "k", None);
1385 let before = std::fs::read(&bin).unwrap();
1386 meta.tags.insert("x".to_string(), "y".to_string());
1387 store.put_object_meta("b", "k", None, &meta).unwrap();
1388 assert_eq!(std::fs::read(&bin).unwrap(), before);
1389 let loaded = store.load().unwrap();
1390 let obj = loaded.buckets.get("b").unwrap().objects.get("k").unwrap();
1391 assert_eq!(obj.meta.tags.get("x").map(String::as_str), Some("y"));
1392 }
1393
1394 #[test]
1395 fn delete_object_cleans_up_files_and_cache() {
1396 let tmp = TempDir::new().unwrap();
1397 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1398 store
1399 .put_bucket_meta(
1400 "b",
1401 &BucketMeta {
1402 name: "b".to_string(),
1403 ..Default::default()
1404 },
1405 )
1406 .unwrap();
1407 let data = Bytes::from_static(b"bye");
1408 let meta = sample_meta("k", 3);
1409 store
1410 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1411 .unwrap();
1412 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1413 assert!(cache.get(&body_key).is_some());
1414 store.delete_object("b", "k", None).unwrap();
1415 let (dir, bin, toml_path) = store.object_paths("b", "k", None);
1416 assert!(!bin.exists());
1417 assert!(!toml_path.exists());
1418 assert!(!dir.exists());
1419 assert!(cache.get(&body_key).is_none());
1420 }
1421
1422 #[test]
1423 fn open_object_body_cache_hit_and_refill() {
1424 let tmp = TempDir::new().unwrap();
1425 let (store, cache) = new_store_with_cache(&tmp, 1024 * 1024);
1426 store
1427 .put_bucket_meta(
1428 "b",
1429 &BucketMeta {
1430 name: "b".to_string(),
1431 ..Default::default()
1432 },
1433 )
1434 .unwrap();
1435 let data = Bytes::from_static(b"payload");
1436 let meta = sample_meta("k", data.len() as u64);
1437 let body_ref = store
1438 .put_object("b", "k", None, BodySource::Bytes(data.clone()), &meta)
1439 .unwrap();
1440 let got = store.open_object_body(&body_ref).unwrap();
1442 assert_eq!(got, data);
1443 let body_key = crate::cache::BodyKey::new("b".to_string(), "k".to_string(), None);
1445 cache.invalidate(&body_key);
1446 assert!(cache.get(&body_key).is_none());
1447 let got = store.open_object_body(&body_ref).unwrap();
1448 assert_eq!(got, data);
1449 assert!(cache.get(&body_key).is_some());
1450 }
1451
1452 #[test]
1453 fn open_object_body_large_bypasses_cache() {
1454 let tmp = TempDir::new().unwrap();
1455 let (store, cache) = new_store_with_cache(&tmp, 1024);
1457 store
1458 .put_bucket_meta(
1459 "b",
1460 &BucketMeta {
1461 name: "b".to_string(),
1462 ..Default::default()
1463 },
1464 )
1465 .unwrap();
1466 let data = Bytes::from(vec![7u8; 800]);
1467 let meta = sample_meta("big", 800);
1468 let body_ref = store
1469 .put_object("b", "big", None, BodySource::Bytes(data.clone()), &meta)
1470 .unwrap();
1471 let body_key = crate::cache::BodyKey::new("b".to_string(), "big".to_string(), None);
1472 assert!(cache.get(&body_key).is_none());
1473 let got = store.open_object_body(&body_ref).unwrap();
1474 assert_eq!(got, data);
1475 assert!(cache.get(&body_key).is_none());
1477 }
1478
1479 #[test]
1480 fn load_empty_dir() {
1481 let tmp = TempDir::new().unwrap();
1482 let store = new_store(&tmp);
1483 let s = store.load().unwrap();
1484 assert!(s.buckets.is_empty());
1485 }
1486
1487 #[test]
1488 fn load_skips_mpu_without_init() {
1489 let tmp = TempDir::new().unwrap();
1490 let store = new_store(&tmp);
1491 store
1492 .put_bucket_meta(
1493 "b",
1494 &BucketMeta {
1495 name: "b".to_string(),
1496 ..Default::default()
1497 },
1498 )
1499 .unwrap();
1500 let data = Bytes::from_static(b"x");
1501 let meta = sample_meta("k", 1);
1502 store
1503 .put_object("b", "k", None, BodySource::Bytes(data), &meta)
1504 .unwrap();
1505 let mpu = store.bucket_dir("b").join("mpu").join("upload1");
1507 std::fs::create_dir_all(&mpu).unwrap();
1508
1509 let loaded = store.load().unwrap();
1510 let snap = loaded.buckets.get("b").unwrap();
1511 assert_eq!(snap.objects.len(), 1);
1512 assert!(snap.multipart_uploads.is_empty());
1513 }
1514
1515 #[test]
1516 fn load_reads_bucket_subresources() {
1517 let tmp = TempDir::new().unwrap();
1518 let store = new_store(&tmp);
1519 store
1520 .put_bucket_meta(
1521 "b",
1522 &BucketMeta {
1523 name: "b".to_string(),
1524 ..Default::default()
1525 },
1526 )
1527 .unwrap();
1528 store
1529 .put_bucket_subresource("b", BucketSubresource::Lifecycle, "<Lifecycle/>")
1530 .unwrap();
1531 store
1532 .put_bucket_subresource("b", BucketSubresource::Cors, "<Cors/>")
1533 .unwrap();
1534 store
1535 .put_bucket_subresource("b", BucketSubresource::Policy, "{}")
1536 .unwrap();
1537 store
1538 .put_bucket_subresource("b", BucketSubresource::Tags, "x=1")
1539 .unwrap();
1540 let loaded = store.load().unwrap();
1541 let snap = loaded.buckets.get("b").unwrap();
1542 assert_eq!(
1543 snap.subresources.get("lifecycle.toml").map(String::as_str),
1544 Some("<Lifecycle/>"),
1545 );
1546 assert_eq!(
1547 snap.subresources.get("cors.toml").map(String::as_str),
1548 Some("<Cors/>"),
1549 );
1550 assert_eq!(
1551 snap.subresources.get("policy.toml").map(String::as_str),
1552 Some("{}"),
1553 );
1554 assert_eq!(
1555 snap.subresources.get("tags.toml").map(String::as_str),
1556 Some("x=1"),
1557 );
1558 }
1559
1560 #[test]
1561 fn versioned_put_load_roundtrip() {
1562 let tmp = TempDir::new().unwrap();
1563 let store = new_store(&tmp);
1564 store
1565 .put_bucket_meta(
1566 "b",
1567 &BucketMeta {
1568 name: "b".to_string(),
1569 versioning: Some("Enabled".to_string()),
1570 ..Default::default()
1571 },
1572 )
1573 .unwrap();
1574 let base = chrono::Utc::now();
1575 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1576 .iter()
1577 .enumerate()
1578 {
1579 let mut m = sample_meta("k", body.len() as u64);
1580 m.version_id = Some((*vid).to_string());
1581 m.last_modified = base + chrono::Duration::seconds(i as i64);
1582 store
1583 .put_object(
1584 "b",
1585 "k",
1586 Some(*vid),
1587 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1588 &m,
1589 )
1590 .unwrap();
1591 }
1592 let loaded = store.load().unwrap();
1593 let snap = loaded.buckets.get("b").unwrap();
1594 let versions = snap.object_versions.get("k").expect("versions present");
1595 assert_eq!(versions.len(), 3);
1596 assert_eq!(versions[0].meta.version_id.as_deref(), Some("v1"));
1597 assert_eq!(versions[1].meta.version_id.as_deref(), Some("v2"));
1598 assert_eq!(versions[2].meta.version_id.as_deref(), Some("v3"));
1599 for v in versions {
1600 match &v.body {
1601 BodyRef::Disk { size, .. } => assert!(*size > 0),
1602 _ => panic!("expected Disk body"),
1603 }
1604 }
1605 }
1606
1607 #[test]
1608 fn versioned_load_promotes_latest_live_to_snap_objects() {
1609 let tmp = TempDir::new().unwrap();
1610 let store = new_store(&tmp);
1611 store
1612 .put_bucket_meta(
1613 "b",
1614 &BucketMeta {
1615 name: "b".to_string(),
1616 versioning: Some("Enabled".to_string()),
1617 ..Default::default()
1618 },
1619 )
1620 .unwrap();
1621 let base = chrono::Utc::now();
1622 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1623 .iter()
1624 .enumerate()
1625 {
1626 let mut m = sample_meta("k", body.len() as u64);
1627 m.version_id = Some((*vid).to_string());
1628 m.last_modified = base + chrono::Duration::seconds(i as i64);
1629 store
1630 .put_object(
1631 "b",
1632 "k",
1633 Some(*vid),
1634 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1635 &m,
1636 )
1637 .unwrap();
1638 }
1639 let loaded = store.load().unwrap();
1640 let snap = loaded.buckets.get("b").unwrap();
1641 let current = snap.objects.get("k").expect("current object promoted");
1642 assert_eq!(current.meta.version_id.as_deref(), Some("v3"));
1643 }
1644
1645 #[test]
1646 fn versioned_load_trailing_delete_marker_hides_current() {
1647 let tmp = TempDir::new().unwrap();
1648 let store = new_store(&tmp);
1649 store
1650 .put_bucket_meta(
1651 "b",
1652 &BucketMeta {
1653 name: "b".to_string(),
1654 versioning: Some("Enabled".to_string()),
1655 ..Default::default()
1656 },
1657 )
1658 .unwrap();
1659 let base = chrono::Utc::now();
1660 for (i, (vid, body)) in [("v1", "one"), ("v2", "two"), ("v3", "three")]
1661 .iter()
1662 .enumerate()
1663 {
1664 let mut m = sample_meta("k", body.len() as u64);
1665 m.version_id = Some((*vid).to_string());
1666 m.last_modified = base + chrono::Duration::seconds(i as i64);
1667 store
1668 .put_object(
1669 "b",
1670 "k",
1671 Some(*vid),
1672 BodySource::Bytes(Bytes::copy_from_slice(body.as_bytes())),
1673 &m,
1674 )
1675 .unwrap();
1676 }
1677 let mut dm = sample_meta("k", 0);
1679 dm.version_id = Some("dm1".to_string());
1680 dm.is_delete_marker = true;
1681 dm.last_modified = base + chrono::Duration::seconds(10);
1682 store
1683 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1684 .unwrap();
1685 let loaded = store.load().unwrap();
1686 let snap = loaded.buckets.get("b").unwrap();
1687 assert!(
1688 !snap.objects.contains_key("k"),
1689 "trailing delete marker must hide current object",
1690 );
1691 assert_eq!(snap.object_versions.get("k").unwrap().len(), 4);
1692 }
1693
1694 #[test]
1695 fn legacy_null_object_overridden_by_newer_versions() {
1696 let tmp = TempDir::new().unwrap();
1699 let store = new_store(&tmp);
1700 store
1701 .put_bucket_meta(
1702 "b",
1703 &BucketMeta {
1704 name: "b".to_string(),
1705 ..Default::default()
1706 },
1707 )
1708 .unwrap();
1709 let base = chrono::Utc::now();
1710 let mut null_meta = sample_meta("k", 3);
1711 null_meta.last_modified = base;
1712 store
1713 .put_object(
1714 "b",
1715 "k",
1716 None,
1717 BodySource::Bytes(Bytes::from_static(b"old")),
1718 &null_meta,
1719 )
1720 .unwrap();
1721 store
1723 .put_bucket_meta(
1724 "b",
1725 &BucketMeta {
1726 name: "b".to_string(),
1727 versioning: Some("Enabled".to_string()),
1728 ..Default::default()
1729 },
1730 )
1731 .unwrap();
1732 let mut v1 = sample_meta("k", 3);
1733 v1.version_id = Some("v1".to_string());
1734 v1.last_modified = base + chrono::Duration::seconds(1);
1735 store
1736 .put_object(
1737 "b",
1738 "k",
1739 Some("v1"),
1740 BodySource::Bytes(Bytes::from_static(b"new")),
1741 &v1,
1742 )
1743 .unwrap();
1744 let mut v2 = sample_meta("k", 5);
1745 v2.version_id = Some("v2".to_string());
1746 v2.last_modified = base + chrono::Duration::seconds(2);
1747 store
1748 .put_object(
1749 "b",
1750 "k",
1751 Some("v2"),
1752 BodySource::Bytes(Bytes::from_static(b"newer")),
1753 &v2,
1754 )
1755 .unwrap();
1756 let loaded = store.load().unwrap();
1757 let snap = loaded.buckets.get("b").unwrap();
1758 let current = snap
1759 .objects
1760 .get("k")
1761 .expect("latest live version must override stale null");
1762 assert_eq!(current.meta.version_id.as_deref(), Some("v2"));
1763 assert_eq!(current.meta.size, 5);
1764 }
1765
1766 #[test]
1767 fn legacy_null_hidden_by_trailing_delete_marker() {
1768 let tmp = TempDir::new().unwrap();
1769 let store = new_store(&tmp);
1770 store
1771 .put_bucket_meta(
1772 "b",
1773 &BucketMeta {
1774 name: "b".to_string(),
1775 ..Default::default()
1776 },
1777 )
1778 .unwrap();
1779 let base = chrono::Utc::now();
1780 let mut null_meta = sample_meta("k", 3);
1781 null_meta.last_modified = base;
1782 store
1783 .put_object(
1784 "b",
1785 "k",
1786 None,
1787 BodySource::Bytes(Bytes::from_static(b"old")),
1788 &null_meta,
1789 )
1790 .unwrap();
1791 store
1792 .put_bucket_meta(
1793 "b",
1794 &BucketMeta {
1795 name: "b".to_string(),
1796 versioning: Some("Enabled".to_string()),
1797 ..Default::default()
1798 },
1799 )
1800 .unwrap();
1801 let mut v1 = sample_meta("k", 3);
1802 v1.version_id = Some("v1".to_string());
1803 v1.last_modified = base + chrono::Duration::seconds(1);
1804 store
1805 .put_object(
1806 "b",
1807 "k",
1808 Some("v1"),
1809 BodySource::Bytes(Bytes::from_static(b"new")),
1810 &v1,
1811 )
1812 .unwrap();
1813 let mut dm = sample_meta("k", 0);
1814 dm.version_id = Some("dm1".to_string());
1815 dm.is_delete_marker = true;
1816 dm.last_modified = base + chrono::Duration::seconds(2);
1817 store
1818 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &dm)
1819 .unwrap();
1820 let loaded = store.load().unwrap();
1821 let snap = loaded.buckets.get("b").unwrap();
1822 assert!(
1823 !snap.objects.contains_key("k"),
1824 "trailing delete marker must hide even a legacy null object",
1825 );
1826 }
1827
1828 #[test]
1829 fn delete_marker_roundtrip_no_body_file() {
1830 let tmp = TempDir::new().unwrap();
1831 let store = new_store(&tmp);
1832 store
1833 .put_bucket_meta(
1834 "b",
1835 &BucketMeta {
1836 name: "b".to_string(),
1837 versioning: Some("Enabled".to_string()),
1838 ..Default::default()
1839 },
1840 )
1841 .unwrap();
1842 let mut m = sample_meta("k", 0);
1843 m.version_id = Some("dm1".to_string());
1844 m.is_delete_marker = true;
1845 store
1846 .put_object("b", "k", Some("dm1"), BodySource::Bytes(Bytes::new()), &m)
1847 .unwrap();
1848 let (_, bin, toml_path) = store.object_paths("b", "k", Some("dm1"));
1850 assert!(!bin.exists(), "delete marker must not have a .bin file");
1851 assert!(toml_path.exists());
1852
1853 let loaded = store.load().unwrap();
1854 let versions = loaded
1855 .buckets
1856 .get("b")
1857 .unwrap()
1858 .object_versions
1859 .get("k")
1860 .unwrap();
1861 assert_eq!(versions.len(), 1);
1862 assert!(versions[0].meta.is_delete_marker);
1863 match &versions[0].body {
1864 BodyRef::Memory(b) => assert_eq!(b.len(), 0),
1865 _ => panic!("delete marker body should be empty Memory"),
1866 }
1867 }
1868
1869 #[test]
1870 fn mixed_nonversioned_and_versioned_buckets() {
1871 let tmp = TempDir::new().unwrap();
1872 let store = new_store(&tmp);
1873 store
1874 .put_bucket_meta(
1875 "a",
1876 &BucketMeta {
1877 name: "a".to_string(),
1878 ..Default::default()
1879 },
1880 )
1881 .unwrap();
1882 store
1883 .put_bucket_meta(
1884 "b",
1885 &BucketMeta {
1886 name: "b".to_string(),
1887 versioning: Some("Enabled".to_string()),
1888 ..Default::default()
1889 },
1890 )
1891 .unwrap();
1892 let ma = sample_meta("only", 3);
1893 store
1894 .put_object(
1895 "a",
1896 "only",
1897 None,
1898 BodySource::Bytes(Bytes::from_static(b"aaa")),
1899 &ma,
1900 )
1901 .unwrap();
1902 let base = chrono::Utc::now();
1903 for (i, vid) in ["v1", "v2"].iter().enumerate() {
1904 let mut m = sample_meta("twice", 2);
1905 m.version_id = Some((*vid).to_string());
1906 m.last_modified = base + chrono::Duration::seconds(i as i64);
1907 store
1908 .put_object(
1909 "b",
1910 "twice",
1911 Some(*vid),
1912 BodySource::Bytes(Bytes::from_static(b"xx")),
1913 &m,
1914 )
1915 .unwrap();
1916 }
1917 let loaded = store.load().unwrap();
1918 assert_eq!(loaded.buckets.len(), 2);
1919 let a = loaded.buckets.get("a").unwrap();
1920 assert_eq!(a.objects.len(), 1);
1921 assert!(a.object_versions.is_empty());
1922 let b = loaded.buckets.get("b").unwrap();
1923 assert_eq!(
1926 b.objects.get("twice").unwrap().meta.version_id.as_deref(),
1927 Some("v2")
1928 );
1929 assert_eq!(b.object_versions.get("twice").unwrap().len(), 2);
1930 }
1931
1932 fn sample_mpu_init(upload_id: &str, key: &str) -> MpuInit {
1933 MpuInit {
1934 upload_id: upload_id.to_string(),
1935 key: key.to_string(),
1936 initiated: chrono::Utc::now(),
1937 content_type: "application/octet-stream".to_string(),
1938 storage_class: "STANDARD".to_string(),
1939 ..Default::default()
1940 }
1941 }
1942
1943 #[test]
1944 fn mpu_create_then_load_empty_parts() {
1945 let tmp = TempDir::new().unwrap();
1946 let store = new_store(&tmp);
1947 store
1948 .put_bucket_meta(
1949 "b",
1950 &BucketMeta {
1951 name: "b".to_string(),
1952 ..Default::default()
1953 },
1954 )
1955 .unwrap();
1956 let init = sample_mpu_init("up1", "k1");
1957 store.mpu_create("b", "up1", &init).unwrap();
1958 let loaded = store.load().unwrap();
1959 let snap = loaded.buckets.get("b").unwrap();
1960 let m = snap.multipart_uploads.get("up1").expect("upload present");
1961 assert_eq!(m.init.upload_id, "up1");
1962 assert_eq!(m.init.key, "k1");
1963 assert!(m.parts.is_empty());
1964 }
1965
1966 #[test]
1967 fn mpu_put_part_then_load_three_parts() {
1968 let tmp = TempDir::new().unwrap();
1969 let store = new_store(&tmp);
1970 store
1971 .put_bucket_meta(
1972 "b",
1973 &BucketMeta {
1974 name: "b".to_string(),
1975 ..Default::default()
1976 },
1977 )
1978 .unwrap();
1979 store
1980 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
1981 .unwrap();
1982 let bodies = [
1983 Bytes::from_static(b"part-one"),
1984 Bytes::from_static(b"part-two-longer"),
1985 Bytes::from_static(b"p3"),
1986 ];
1987 for (i, body) in bodies.iter().enumerate() {
1988 let n = (i + 1) as u32;
1989 store
1990 .mpu_put_part(
1991 "b",
1992 "up",
1993 n,
1994 BodySource::Bytes(body.clone()),
1995 &format!("et{}", n),
1996 )
1997 .unwrap();
1998 }
1999 let loaded = store.load().unwrap();
2000 let snap = loaded.buckets.get("b").unwrap();
2001 let m = snap.multipart_uploads.get("up").unwrap();
2002 assert_eq!(m.parts.len(), 3);
2003 for (i, body) in bodies.iter().enumerate() {
2004 let n = (i + 1) as u32;
2005 let part = m.parts.get(&n).unwrap();
2006 assert_eq!(part.meta.part_number, n);
2007 assert_eq!(part.meta.size, body.len() as u64);
2008 assert_eq!(part.meta.etag, format!("et{}", n));
2009 match &part.body {
2010 BodyRef::Disk { path, size, .. } => {
2011 assert_eq!(*size, body.len() as u64);
2012 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2013 }
2014 _ => panic!("expected Disk"),
2015 }
2016 }
2017 }
2018
2019 #[test]
2020 fn mpu_abort_removes_upload() {
2021 let tmp = TempDir::new().unwrap();
2022 let store = new_store(&tmp);
2023 store
2024 .put_bucket_meta(
2025 "b",
2026 &BucketMeta {
2027 name: "b".to_string(),
2028 ..Default::default()
2029 },
2030 )
2031 .unwrap();
2032 store
2033 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2034 .unwrap();
2035 store
2036 .mpu_put_part(
2037 "b",
2038 "up",
2039 1,
2040 BodySource::Bytes(Bytes::from_static(b"x")),
2041 "e",
2042 )
2043 .unwrap();
2044 store.mpu_abort("b", "up").unwrap();
2045 assert!(!store.mpu_dir("b", "up").exists());
2046 store.mpu_abort("b", "up").unwrap();
2048 let loaded = store.load().unwrap();
2049 let snap = loaded.buckets.get("b").unwrap();
2050 assert!(snap.multipart_uploads.is_empty());
2051 }
2052
2053 #[test]
2054 fn mpu_complete_single_part_fast_path() {
2055 let tmp = TempDir::new().unwrap();
2056 let store = new_store(&tmp);
2057 store
2058 .put_bucket_meta(
2059 "b",
2060 &BucketMeta {
2061 name: "b".to_string(),
2062 ..Default::default()
2063 },
2064 )
2065 .unwrap();
2066 store
2067 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2068 .unwrap();
2069 let body = Bytes::from_static(b"only-part-bytes");
2070 store
2071 .mpu_put_part("b", "up", 1, BodySource::Bytes(body.clone()), "et")
2072 .unwrap();
2073 let meta = sample_meta("k", body.len() as u64);
2074 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2075 match &body_ref {
2076 BodyRef::Disk { path, size, .. } => {
2077 assert_eq!(*size, body.len() as u64);
2078 assert_eq!(std::fs::read(path).unwrap(), body.to_vec());
2079 }
2080 _ => panic!("expected Disk"),
2081 }
2082 assert!(!store.mpu_dir("b", "up").exists());
2083 }
2084
2085 #[test]
2086 fn mpu_complete_multi_part_concat() {
2087 let tmp = TempDir::new().unwrap();
2088 let store = new_store(&tmp);
2089 store
2090 .put_bucket_meta(
2091 "b",
2092 &BucketMeta {
2093 name: "b".to_string(),
2094 ..Default::default()
2095 },
2096 )
2097 .unwrap();
2098 store
2099 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2100 .unwrap();
2101 let p1 = Bytes::from_static(b"AAAA");
2102 let p2 = Bytes::from_static(b"BBBBBB");
2103 let p3 = Bytes::from_static(b"CC");
2104 for (n, b) in [(1u32, &p1), (2, &p2), (3, &p3)] {
2105 store
2106 .mpu_put_part("b", "up", n, BodySource::Bytes(b.clone()), "e")
2107 .unwrap();
2108 }
2109 let mut expected = Vec::new();
2110 expected.extend_from_slice(&p1);
2111 expected.extend_from_slice(&p2);
2112 expected.extend_from_slice(&p3);
2113 let meta = sample_meta("k", expected.len() as u64);
2114 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2115 let path = match body_ref {
2116 BodyRef::Disk { path, size, .. } => {
2117 assert_eq!(size, expected.len() as u64);
2118 path
2119 }
2120 _ => panic!("expected Disk"),
2121 };
2122 assert_eq!(std::fs::read(&path).unwrap(), expected);
2123 assert!(!store.mpu_dir("b", "up").exists());
2124 }
2125
2126 #[test]
2127 fn mpu_complete_large_streaming_via_file_source() {
2128 let tmp = TempDir::new().unwrap();
2129 let store = new_store(&tmp);
2130 store
2131 .put_bucket_meta(
2132 "b",
2133 &BucketMeta {
2134 name: "b".to_string(),
2135 ..Default::default()
2136 },
2137 )
2138 .unwrap();
2139 store
2140 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2141 .unwrap();
2142
2143 const PART_SIZE: usize = 1024 * 1024;
2146 let patterns: [u8; 3] = [0x11, 0x22, 0x33];
2147 let mut expected: Vec<u8> = Vec::with_capacity(PART_SIZE * 3);
2148 for (i, byte) in patterns.iter().enumerate() {
2149 let src = tmp.path().join(format!("src-{}.bin", i + 1));
2150 let data = vec![*byte; PART_SIZE];
2151 std::fs::write(&src, &data).unwrap();
2152 expected.extend_from_slice(&data);
2153 store
2154 .mpu_put_part("b", "up", (i + 1) as u32, BodySource::File(src), "et")
2155 .unwrap();
2156 }
2157 let meta = sample_meta("k", expected.len() as u64);
2158 let body_ref = store.mpu_complete("b", "up", "k", None, &meta).unwrap();
2159 let path = match body_ref {
2160 BodyRef::Disk { path, size, .. } => {
2161 assert_eq!(size, expected.len() as u64);
2162 path
2163 }
2164 _ => panic!("expected Disk"),
2165 };
2166 let actual = std::fs::read(&path).unwrap();
2167 assert_eq!(actual.len(), expected.len());
2168 assert_eq!(actual, expected);
2169 assert!(!store.mpu_dir("b", "up").exists());
2170 }
2171
2172 #[test]
2173 fn mpu_resumable_across_load() {
2174 let tmp = TempDir::new().unwrap();
2175 let store = new_store(&tmp);
2176 store
2177 .put_bucket_meta(
2178 "b",
2179 &BucketMeta {
2180 name: "b".to_string(),
2181 ..Default::default()
2182 },
2183 )
2184 .unwrap();
2185 store
2186 .mpu_create("b", "up", &sample_mpu_init("up", "k"))
2187 .unwrap();
2188 let p1 = Bytes::from_static(b"hello-");
2189 let p2 = Bytes::from_static(b"world-");
2190 store
2191 .mpu_put_part("b", "up", 1, BodySource::Bytes(p1.clone()), "e1")
2192 .unwrap();
2193 store
2194 .mpu_put_part("b", "up", 2, BodySource::Bytes(p2.clone()), "e2")
2195 .unwrap();
2196
2197 let store2 = new_store_with_cache(&tmp, 1024 * 1024).0;
2199 let loaded = store2.load().unwrap();
2200 let snap = loaded.buckets.get("b").unwrap();
2201 let m = snap.multipart_uploads.get("up").unwrap();
2202 assert_eq!(m.parts.len(), 2);
2203 assert_eq!(m.parts.get(&1).unwrap().meta.etag, "e1");
2204 assert_eq!(m.parts.get(&2).unwrap().meta.etag, "e2");
2205
2206 let p3 = Bytes::from_static(b"again!");
2208 store2
2209 .mpu_put_part("b", "up", 3, BodySource::Bytes(p3.clone()), "e3")
2210 .unwrap();
2211 let mut expected = Vec::new();
2212 expected.extend_from_slice(&p1);
2213 expected.extend_from_slice(&p2);
2214 expected.extend_from_slice(&p3);
2215 let meta = sample_meta("k", expected.len() as u64);
2216 let body_ref = store2.mpu_complete("b", "up", "k", None, &meta).unwrap();
2217 let path = match body_ref {
2218 BodyRef::Disk { path, .. } => path,
2219 _ => panic!(),
2220 };
2221 assert_eq!(std::fs::read(&path).unwrap(), expected);
2222 assert!(!store2.mpu_dir("b", "up").exists());
2223 }
2224
2225 #[test]
2226 fn tags_snapshot_roundtrip_via_store() {
2227 let tmp = TempDir::new().unwrap();
2228 let store = new_store(&tmp);
2229 store
2230 .put_bucket_meta(
2231 "b",
2232 &BucketMeta {
2233 name: "b".to_string(),
2234 ..Default::default()
2235 },
2236 )
2237 .unwrap();
2238 let mut tags = HashMap::new();
2239 tags.insert("env".to_string(), "prod".to_string());
2240 tags.insert("team".to_string(), "s3".to_string());
2241 let snap = TagsSnapshot { tags: tags.clone() };
2242 let payload = toml::to_string(&snap).unwrap();
2243 store
2244 .put_bucket_subresource("b", BucketSubresource::Tags, &payload)
2245 .unwrap();
2246 let loaded = store.load().unwrap();
2247 let text = loaded
2248 .buckets
2249 .get("b")
2250 .unwrap()
2251 .subresources
2252 .get("tags.toml")
2253 .cloned()
2254 .unwrap();
2255 let decoded: TagsSnapshot = toml::from_str(&text).unwrap();
2256 assert_eq!(decoded.tags, tags);
2257 }
2258
2259 #[test]
2260 fn acl_snapshot_roundtrip_via_store() {
2261 let tmp = TempDir::new().unwrap();
2262 let store = new_store(&tmp);
2263 store
2264 .put_bucket_meta(
2265 "b",
2266 &BucketMeta {
2267 name: "b".to_string(),
2268 ..Default::default()
2269 },
2270 )
2271 .unwrap();
2272 let snap = AclSnapshot {
2273 owner_id: "owner-abc".to_string(),
2274 grants: vec![
2275 AclGrantSnapshot {
2276 grantee_type: "CanonicalUser".to_string(),
2277 grantee_id: Some("owner-abc".to_string()),
2278 grantee_display_name: Some("owner".to_string()),
2279 grantee_uri: None,
2280 permission: "FULL_CONTROL".to_string(),
2281 },
2282 AclGrantSnapshot {
2283 grantee_type: "Group".to_string(),
2284 grantee_id: None,
2285 grantee_display_name: None,
2286 grantee_uri: Some(
2287 "http://acs.amazonaws.com/groups/global/AllUsers".to_string(),
2288 ),
2289 permission: "READ".to_string(),
2290 },
2291 ],
2292 };
2293 let payload = toml::to_string(&snap).unwrap();
2294 store
2295 .put_bucket_subresource("b", BucketSubresource::Acl, &payload)
2296 .unwrap();
2297 let loaded = store.load().unwrap();
2298 let text = loaded
2299 .buckets
2300 .get("b")
2301 .unwrap()
2302 .subresources
2303 .get("acl.toml")
2304 .cloned()
2305 .unwrap();
2306 let decoded: AclSnapshot = toml::from_str(&text).unwrap();
2307 assert_eq!(decoded.owner_id, "owner-abc");
2308 assert_eq!(decoded.grants.len(), 2);
2309 assert_eq!(decoded.grants[0].permission, "FULL_CONTROL");
2310 assert_eq!(decoded.grants[1].grantee_type, "Group");
2311 }
2312
2313 #[test]
2314 fn inventory_snapshot_roundtrip_via_store() {
2315 let tmp = TempDir::new().unwrap();
2316 let store = new_store(&tmp);
2317 store
2318 .put_bucket_meta(
2319 "b",
2320 &BucketMeta {
2321 name: "b".to_string(),
2322 ..Default::default()
2323 },
2324 )
2325 .unwrap();
2326 let mut configs = HashMap::new();
2327 configs.insert(
2328 "inv-1".to_string(),
2329 "<InventoryConfiguration id=\"inv-1\"/>".to_string(),
2330 );
2331 configs.insert(
2332 "inv-2".to_string(),
2333 "<InventoryConfiguration id=\"inv-2\"/>".to_string(),
2334 );
2335 let snap = InventorySnapshot {
2336 configs: configs.clone(),
2337 };
2338 let payload = toml::to_string(&snap).unwrap();
2339 store
2340 .put_bucket_subresource("b", BucketSubresource::Inventory, &payload)
2341 .unwrap();
2342 let loaded = store.load().unwrap();
2343 let text = loaded
2344 .buckets
2345 .get("b")
2346 .unwrap()
2347 .subresources
2348 .get("inventory.toml")
2349 .cloned()
2350 .unwrap();
2351 let decoded: InventorySnapshot = toml::from_str(&text).unwrap();
2352 assert_eq!(decoded.configs, configs);
2353 }
2354
2355 #[test]
2356 fn legacy_versioning_file_is_read() {
2357 let tmp = TempDir::new().unwrap();
2358 let store = new_store(&tmp);
2359 store
2361 .put_bucket_meta(
2362 "b",
2363 &BucketMeta {
2364 name: "b".to_string(),
2365 ..Default::default()
2366 },
2367 )
2368 .unwrap();
2369 let path = store.bucket_dir("b").join("versioning.toml");
2371 std::fs::write(&path, "Enabled").unwrap();
2372 let loaded = store.load().unwrap();
2373 let snap = loaded.buckets.get("b").unwrap();
2374 assert_eq!(snap.meta.versioning.as_deref(), Some("Enabled"));
2375 }
2376}